diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -17,6 +17,7 @@ #include <interfaces/handler.h> #include <key.h> #include <net.h> +#include <primitives/transaction.h> #include <rwcollection.h> #include <util/variant.h> @@ -67,7 +68,8 @@ Stale, }; -using AnyVoteItem = std::variant<const ProofRef, const CBlockIndex *>; +using AnyVoteItem = + std::variant<const ProofRef, const CBlockIndex *, const CTransactionRef>; class VoteItemUpdate { AnyVoteItem item; @@ -97,6 +99,9 @@ // Reverse ordering so we get the highest work first return CBlockIndexWorkComparator()(rhs, lhs); }, + [](const CTransactionRef &lhs, const CTransactionRef &rhs) { + return lhs->GetId() < rhs->GetId(); + }, [](const auto &lhs, const auto &rhs) { // This serves 2 purposes: // - This makes sure that we don't forget to implement a @@ -129,6 +134,7 @@ Config avaconfig; CConnman *connman; ChainstateManager &chainman; + CTxMemPool *mempool; /** * Items to run avalanche on. @@ -214,9 +220,9 @@ delayedAvahelloNodeIds GUARDED_BY(cs_delayedAvahelloNodeIds); Processor(Config avaconfig, interfaces::Chain &chain, CConnman *connmanIn, - ChainstateManager &chainman, CScheduler &scheduler, - std::unique_ptr<PeerData> peerDataIn, CKey sessionKeyIn, - uint32_t minQuorumTotalScoreIn, + ChainstateManager &chainman, CTxMemPool *mempoolIn, + CScheduler &scheduler, std::unique_ptr<PeerData> peerDataIn, + CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, int64_t minAvaproofsNodeCountIn, uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn, Amount stakeUtxoDustThresholdIn); @@ -227,7 +233,8 @@ static std::unique_ptr<Processor> MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, ChainstateManager &chainman, - CScheduler &scheduler, bilingual_str &error); + CTxMemPool *mempoolIn, CScheduler &scheduler, + bilingual_str &error); bool addToReconcile(const AnyVoteItem &item); bool isAccepted(const AnyVoteItem &item) const; @@ -303,6 +310,7 @@ LOCKS_EXCLUDED(cs_main); bool operator()(const ProofRef &proof) const LOCKS_EXCLUDED(cs_peerManager); + bool operator()(const CTransactionRef &tx) const; }; bool isWorthPolling(const AnyVoteItem &item) const { return std::visit(IsWorthPolling(*this), item); @@ -318,6 +326,7 @@ LOCKS_EXCLUDED(cs_main); bool operator()(const ProofRef &proof) const LOCKS_EXCLUDED(cs_peerManager); + bool operator()(const CTransactionRef &tx) const; }; bool getLocalAcceptance(const AnyVoteItem &item) const { return std::visit(GetLocalAcceptance(*this), item); diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -143,7 +143,7 @@ Processor::Processor(Config avaconfigIn, interfaces::Chain &chain, CConnman *connmanIn, ChainstateManager &chainmanIn, - CScheduler &scheduler, + CTxMemPool *mempoolIn, CScheduler &scheduler, std::unique_ptr<PeerData> peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, @@ -151,8 +151,9 @@ uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn, Amount stakeUtxoDustThreshold) : avaconfig(std::move(avaconfigIn)), connman(connmanIn), - chainman(chainmanIn), round(0), peerManager(std::make_unique<PeerManager>( - stakeUtxoDustThreshold, chainman)), + chainman(chainmanIn), mempool(mempoolIn), round(0), + peerManager( + std::make_unique<PeerManager>(stakeUtxoDustThreshold, chainman)), peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)), minQuorumScore(minQuorumTotalScoreIn), minQuorumConnectedScoreRatio(minQuorumConnectedScoreRatioIn), @@ -180,7 +181,8 @@ std::unique_ptr<Processor> Processor::MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, ChainstateManager &chainman, - CScheduler &scheduler, bilingual_str &error) { + CTxMemPool *mempool, CScheduler &scheduler, + bilingual_str &error) { std::unique_ptr<PeerData> peerData; CKey masterKey; CKey sessionKey; @@ -353,7 +355,7 @@ // We can't use std::make_unique with a private constructor return std::unique_ptr<Processor>(new Processor( - std::move(avaconfig), chain, connman, chainman, scheduler, + std::move(avaconfig), chain, connman, chainman, mempool, scheduler, std::move(peerData), std::move(sessionKey), Proof::amountToScore(minQuorumStake), minQuorumConnectedStakeRatio, minAvaproofsNodeCount, staleVoteThreshold, staleVoteFactor, @@ -907,6 +909,7 @@ [](const CBlockIndex *pindex) { return CInv(MSG_BLOCK, pindex->GetBlockHash()); }, + [](const CTransactionRef &tx) { return CInv(MSG_TX, tx->GetHash()); }, }; auto r = voteRecords.getReadView(); @@ -941,6 +944,10 @@ return peerManager->getProof(ProofId(inv.hash))); } + if (mempool && inv.IsMsgTx()) { + return WITH_LOCK(mempool->cs, return mempool->get(TxId(inv.hash))); + } + return {nullptr}; } @@ -978,6 +985,18 @@ processor.peerManager->isInConflictingPool(proofid); } +bool Processor::IsWorthPolling::operator()(const CTransactionRef &tx) const { + if (!processor.mempool) { + return false; + } + + // TODO For now the transactions with conflicts or rejected by policies are + // not stored anywhere, so only the mempool transactions are worth polling. + AssertLockNotHeld(processor.mempool->cs); + return WITH_LOCK(processor.mempool->cs, + return processor.mempool->exists(tx->GetId())); +} + bool Processor::GetLocalAcceptance::operator()( const CBlockIndex *pindex) const { AssertLockNotHeld(cs_main); @@ -994,4 +1013,16 @@ return processor.peerManager->isBoundToPeer(proof->getId())); } +bool Processor::GetLocalAcceptance::operator()( + const CTransactionRef &tx) const { + if (!processor.mempool) { + return false; + } + + AssertLockNotHeld(processor.mempool->cs); + + return WITH_LOCK(processor.mempool->cs, + return processor.mempool->exists(tx->GetId())); +} + } // namespace avalanche diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -119,7 +119,8 @@ bilingual_str error; m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), - *Assert(m_node.chainman), *m_node.scheduler, error); + *Assert(m_node.chainman), m_node.mempool.get(), *m_node.scheduler, + error); BOOST_CHECK(m_processor); } @@ -336,12 +337,80 @@ } }; +struct TxProvider { + AvalancheTestingSetup *fixture; + + std::vector<avalanche::VoteItemUpdate> updates; + uint32_t invType; + + TxProvider(AvalancheTestingSetup *_fixture) + : fixture(_fixture), invType(MSG_TX) {} + + CTransactionRef buildVoteItem() const { + CMutableTransaction mtx; + mtx.nVersion = 2; + mtx.vin.emplace_back(COutPoint{TxId(FastRandomContext().rand256()), 0}); + mtx.vout.emplace_back(1 * COIN, CScript() << OP_TRUE); + + CTransactionRef tx = MakeTransactionRef(std::move(mtx)); + + TestMemPoolEntryHelper mempoolEntryHelper; + auto entry = mempoolEntryHelper.FromTx(tx); + + CTxMemPool *mempool = Assert(fixture->m_node.mempool.get()); + { + LOCK2(cs_main, mempool->cs); + mempool->addUnchecked(entry); + BOOST_CHECK(mempool->exists(tx->GetId())); + } + + return tx; + } + + uint256 getVoteItemId(const CTransactionRef &tx) const { + return tx->GetId(); + } + + std::vector<Vote> buildVotesForItems(uint32_t error, + std::vector<CTransactionRef> &&items) { + size_t numItems = items.size(); + + std::vector<Vote> votes; + votes.reserve(numItems); + + // Transactions are sorted by TxId + std::sort(items.begin(), items.end(), + [](const CTransactionRef &lhs, const CTransactionRef &rhs) { + return lhs->GetId() < rhs->GetId(); + }); + for (auto &item : items) { + votes.emplace_back(error, item->GetId()); + } + + return votes; + } + + void invalidateItem(const CTransactionRef &tx) { + BOOST_CHECK(tx != nullptr); + CTxMemPool *mempool = Assert(fixture->m_node.mempool.get()); + + LOCK(mempool->cs); + mempool->removeRecursive(*tx, MemPoolRemovalReason::CONFLICT); + BOOST_CHECK(!mempool->exists(tx->GetId())); + } + + const CTransactionRef fromAnyVoteItem(const AnyVoteItem &item) { + return std::get<const CTransactionRef>(item); + } +}; + } // namespace BOOST_FIXTURE_TEST_SUITE(processor_tests, AvalancheTestingSetup) // FIXME A std::tuple can be used instead of boost::mpl::list after boost 1.67 -using VoteItemProviders = boost::mpl::list<BlockProvider, ProofProvider>; +using VoteItemProviders = + boost::mpl::list<BlockProvider, ProofProvider, TxProvider>; BOOST_AUTO_TEST_CASE_TEMPLATE(voteitemupdate, P, VoteItemProviders) { P provider(this); @@ -849,9 +918,9 @@ setArg("-avatimeout", ToString(queryTimeDuration.count())); bilingual_str error; - m_processor = Processor::MakeProcessor(*m_node.args, *m_node.chain, - m_node.connman.get(), chainman, - *m_node.scheduler, error); + m_processor = Processor::MakeProcessor( + *m_node.args, *m_node.chain, m_node.connman.get(), chainman, + m_node.mempool.get(), *m_node.scheduler, error); const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); @@ -1286,9 +1355,9 @@ bilingual_str error; ChainstateManager &chainman = *Assert(m_node.chainman); - m_processor = Processor::MakeProcessor(*m_node.args, *m_node.chain, - m_node.connman.get(), chainman, - *m_node.scheduler, error); + m_processor = Processor::MakeProcessor( + *m_node.args, *m_node.chain, m_node.connman.get(), chainman, + m_node.mempool.get(), *m_node.scheduler, error); BOOST_CHECK(m_processor != nullptr); BOOST_CHECK(m_processor->getLocalProof() != nullptr); @@ -1481,7 +1550,8 @@ bilingual_str error; std::unique_ptr<Processor> processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), - *Assert(m_node.chainman), *m_node.scheduler, error); + *Assert(m_node.chainman), m_node.mempool.get(), *m_node.scheduler, + error); if (success) { BOOST_CHECK(processor != nullptr); @@ -1504,7 +1574,7 @@ bilingual_str error; auto processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), chainman, - *m_node.scheduler, error); + m_node.mempool.get(), *m_node.scheduler, error); auto addNode = [&](NodeId nodeid) { auto proof = buildRandomProof(chainman.ActiveChainstate(), @@ -1569,7 +1639,8 @@ bilingual_str error; m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), - *Assert(m_node.chainman), *m_node.scheduler, error); + *Assert(m_node.chainman), m_node.mempool.get(), *m_node.scheduler, + error); BOOST_CHECK(m_processor != nullptr); BOOST_CHECK(error.empty()); diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -2424,8 +2424,8 @@ // Step 6.5 (I guess ?): Initialize Avalanche. bilingual_str avalancheError; g_avalanche = avalanche::Processor::MakeProcessor( - args, *node.chain, node.connman.get(), chainman, *node.scheduler, - avalancheError); + args, *node.chain, node.connman.get(), chainman, node.mempool.get(), + *node.scheduler, avalancheError); if (!g_avalanche) { InitError(avalancheError); return false; diff --git a/src/test/net_tests.cpp b/src/test/net_tests.cpp --- a/src/test/net_tests.cpp +++ b/src/test/net_tests.cpp @@ -1064,7 +1064,7 @@ // slots are not allocated. g_avalanche = avalanche::Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), *m_node.chainman, - *m_node.scheduler, error); + m_node.mempool.get(), *m_node.scheduler, error); BOOST_CHECK(g_avalanche); CConnman::Options options;