diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -223,6 +223,20 @@ fFeeEstimatesInitialized = false; } + // FlushStateToDisk generates a SetBestChain callback, which we should avoid + // missing + FlushStateToDisk(); + + // After there are no more peers/RPC left to give us new data which may + // generate CValidationInterface callbacks, flush them... + GetMainSignals().FlushBackgroundCallbacks(); + + // Any future callbacks will be dropped. This should absolutely be safe - if + // missing a callback results in an unrecoverable situation, unclean + // shutdown would too. The only reason to do the above flushes is to let the + // wallet catch up with our current chain to avoid any strange pruning edge + // cases and make next startup faster by avoiding rescan. + { LOCK(cs_main); if (pcoinsTip != nullptr) { @@ -259,6 +273,7 @@ } #endif UnregisterAllValidationInterfaces(); + GetMainSignals().UnregisterBackgroundSignalScheduler(); #ifdef ENABLE_WALLET for (CWalletRef pwallet : vpwallets) { delete pwallet; @@ -1778,6 +1793,8 @@ threadGroup.create_thread(boost::bind(&TraceThread, "scheduler", serviceLoop)); + GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); + /** * Start the RPC server. It will be started in "warmup" mode and not * process calls yet (but it will verify that the server is there and will diff --git a/src/qt/test/rpcnestedtests.cpp b/src/qt/test/rpcnestedtests.cpp --- a/src/qt/test/rpcnestedtests.cpp +++ b/src/qt/test/rpcnestedtests.cpp @@ -11,6 +11,7 @@ #include "rpc/register.h" #include "rpc/server.h" #include "rpcconsole.h" +#include "test/test_bitcoin.h" #include "test/testutil.h" #include "univalue.h" #include "util.h" @@ -32,12 +33,8 @@ }; void RPCNestedTests::rpcNestedTests() { - UniValue jsonRPCError; - - // Do some test setup could be moved to a more generic place when we add - // more tests on QT level - const Config &config = GetConfig(); - RegisterAllContextFreeRPCCommands(tableRPC); + // do some test setup + // could be moved to a more generic place when we add more tests on QT level tableRPC.appendCommand("rpcNestedTest", &vRPCCommands[0]); ClearDatadirCache(); std::string path = @@ -48,15 +45,8 @@ dir.mkpath("."); gArgs.ForceSetArg("-datadir", path); // mempool.setSanityCheck(1.0); - pblocktree = new CBlockTreeDB(1 << 20, true); - pcoinsdbview = new CCoinsViewDB(1 << 23, true); - pcoinsTip = new CCoinsViewCache(pcoinsdbview); - InitBlockIndex(config); - { - CValidationState state; - bool ok = ActivateBestChain(config, state); - QVERIFY(ok); - } + + TestingSetup test; SetRPCWarmupFinished(); @@ -209,13 +199,5 @@ std::runtime_error); #endif - UnloadBlockIndex(); - delete pcoinsTip; - pcoinsTip = nullptr; - delete pcoinsdbview; - pcoinsdbview = nullptr; - delete pblocktree; - pblocktree = nullptr; - fs::remove_all(fs::path(path)); } diff --git a/src/scheduler.h b/src/scheduler.h --- a/src/scheduler.h +++ b/src/scheduler.h @@ -5,6 +5,8 @@ #ifndef BITCOIN_SCHEDULER_H #define BITCOIN_SCHEDULER_H +#include "sync.h" + // // NOTE: // boost::thread / boost::chrono should be ported to @@ -12,6 +14,7 @@ // #include #include + #include // @@ -41,7 +44,9 @@ typedef std::function Function; // Call func at/after time t - void schedule(Function f, boost::chrono::system_clock::time_point t); + void schedule(Function f, + boost::chrono::system_clock::time_point t = + boost::chrono::system_clock::now()); // Convenience method: call f once deltaMilliSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); @@ -68,6 +73,9 @@ size_t getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const; + // Returns true if there are threads actively running in serviceQueue() + bool AreThreadsServicingQueue() const; + private: std::multimap taskQueue; boost::condition_variable newTaskScheduled; @@ -80,4 +88,32 @@ } }; +/** + * Class used by CScheduler clients which may schedule multiple jobs + * which are required to be run serially. Does not require such jobs + * to be executed on the same thread, but no two jobs will be executed + * at the same time. + */ +class SingleThreadedSchedulerClient { +private: + CScheduler *m_pscheduler; + + CCriticalSection m_cs_callbacks_pending; + std::list> m_callbacks_pending; + bool m_are_callbacks_running = false; + + void MaybeScheduleProcessQueue(); + void ProcessQueue(); + +public: + SingleThreadedSchedulerClient(CScheduler *pschedulerIn) + : m_pscheduler(pschedulerIn) {} + void AddToProcessQueue(std::function func); + + // Processes all remaining queue members on the calling thread, blocking + // until queue is empty + // Must be called after the CScheduler has no remaining processing threads! + void EmptyQueue(); +}; + #endif diff --git a/src/scheduler.cpp b/src/scheduler.cpp --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -142,3 +142,72 @@ } return result; } + +bool CScheduler::AreThreadsServicingQueue() const { + return nThreadsServicingQueue; +} + +void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { + { + LOCK(m_cs_callbacks_pending); + // Try to avoid scheduling too many copies here, but if we + // accidentally have two ProcessQueue's scheduled at once its + // not a big deal. + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + } + m_pscheduler->schedule( + std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); +} + +void SingleThreadedSchedulerClient::ProcessQueue() { + std::function callback; + { + LOCK(m_cs_callbacks_pending); + if (m_are_callbacks_running) return; + if (m_callbacks_pending.empty()) return; + m_are_callbacks_running = true; + + callback = std::move(m_callbacks_pending.front()); + m_callbacks_pending.pop_front(); + } + + // RAII the setting of fCallbacksRunning and calling + // MaybeScheduleProcessQueue + // to ensure both happen safely even if callback() throws. + struct RAIICallbacksRunning { + SingleThreadedSchedulerClient *instance; + RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance) + : instance(_instance) {} + ~RAIICallbacksRunning() { + { + LOCK(instance->m_cs_callbacks_pending); + instance->m_are_callbacks_running = false; + } + instance->MaybeScheduleProcessQueue(); + } + } raiicallbacksrunning(this); + + callback(); +} + +void SingleThreadedSchedulerClient::AddToProcessQueue( + std::function func) { + assert(m_pscheduler); + + { + LOCK(m_cs_callbacks_pending); + m_callbacks_pending.emplace_back(std::move(func)); + } + MaybeScheduleProcessQueue(); +} + +void SingleThreadedSchedulerClient::EmptyQueue() { + assert(!m_pscheduler->AreThreadsServicingQueue()); + bool should_continue = true; + while (should_continue) { + ProcessQueue(); + LOCK(m_cs_callbacks_pending); + should_continue = !m_callbacks_pending.empty(); + } +} diff --git a/src/test/test_bitcoin.cpp b/src/test/test_bitcoin.cpp --- a/src/test/test_bitcoin.cpp +++ b/src/test/test_bitcoin.cpp @@ -90,6 +90,12 @@ (int)(InsecureRandRange(100000))); fs::create_directories(pathTemp); gArgs.ForceSetArg("-datadir", pathTemp.string()); + + // Note that because we don't bother running a scheduler thread here, + // callbacks via CValidationInterface are unreliable, but that's OK, + // our unit tests aren't testing multiple parts of the code at once. + GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); + mempool.setSanityCheck(1.0); pblocktree = new CBlockTreeDB(1 << 20, true); pcoinsdbview = new CCoinsViewDB(1 << 23, true); @@ -117,6 +123,8 @@ TestingSetup::~TestingSetup() { threadGroup.interrupt_all(); threadGroup.join_all(); + GetMainSignals().FlushBackgroundCallbacks(); + GetMainSignals().UnregisterBackgroundSignalScheduler(); g_connman.reset(); peerLogic.reset(); UnloadBlockIndex(); diff --git a/src/validationinterface.h b/src/validationinterface.h --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -8,8 +8,6 @@ #include "primitives/transaction.h" // CTransaction(Ref) -#include - #include class CBlock; @@ -21,6 +19,7 @@ class CValidationInterface; class CValidationState; class uint256; +class CScheduler; // These functions dispatch to one or all registered wallets @@ -53,17 +52,36 @@ * Called on a background thread. */ virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {} + /** + * Notifies listeners of a block being connected. + * Provides a vector of transactions evicted from the mempool as a result. + */ virtual void BlockConnected(const std::shared_ptr &block, const CBlockIndex *pindex, const std::vector &txnConflicted) {} + /** Notifies listeners of a block being disconnected */ virtual void BlockDisconnected(const std::shared_ptr &block) { } + /** Notifies listeners of the new active block chain on-disk. */ virtual void SetBestChain(const CBlockLocator &locator) {} + /** Notifies listeners about an inventory item being seen on the network. */ virtual void Inventory(const uint256 &hash) {} + /** Tells listeners to broadcast their data. */ virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman *connman) {} + /** + * Notifies listeners of a block validation result. + * If the provided CValidationState IsValid, the provided block + * is guaranteed to be the current best block at the time the + * callback was generated (not necessarily now) + */ virtual void BlockChecked(const CBlock &, const CValidationState &) {} + /** + * Notifies listeners that a block which builds directly on our current tip + * has been received and connected to the headers tree, though not validated + * yet. + */ virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &block){}; friend void ::RegisterValidationInterface(CValidationInterface *); @@ -71,43 +89,43 @@ friend void ::UnregisterAllValidationInterfaces(); }; -struct CMainSignals { - /** Notifies listeners of updated block chain tip */ - boost::signals2::signal - UpdatedBlockTip; - /** Notifies listeners of a transaction having been added to mempool. */ - boost::signals2::signal - TransactionAddedToMempool; +struct MainSignalsInstance; +class CMainSignals { +private: + std::unique_ptr m_internals; + + friend void ::RegisterValidationInterface(CValidationInterface *); + friend void ::UnregisterValidationInterface(CValidationInterface *); + friend void ::UnregisterAllValidationInterfaces(); + +public: /** - * Notifies listeners of a block being connected. - * Provides a vector of transactions evicted from the mempool as a result. + * Register a CScheduler to give callbacks which should run in the + * background (may only be called once) */ - boost::signals2::signal &, - const CBlockIndex *pindex, - const std::vector &)> - BlockConnected; - /** Notifies listeners of a block being disconnected */ - boost::signals2::signal &)> - BlockDisconnected; - /** Notifies listeners of a new active block chain. */ - boost::signals2::signal SetBestChain; - /** Notifies listeners about an inventory item being seen on the network. */ - boost::signals2::signal Inventory; - /** Tells listeners to broadcast their data. */ - boost::signals2::signal - Broadcast; - /** Notifies listeners of a block validation result */ - boost::signals2::signal - BlockChecked; + void RegisterBackgroundSignalScheduler(CScheduler &scheduler); /** - * Notifies listeners that a block which builds directly on our current tip - * has been received and connected to the headers tree, though not validated - * yet. + * Unregister a CScheduler to give callbacks which should run in the + * background - these callbacks will now be dropped! */ - boost::signals2::signal &)> - NewPoWValidBlock; + void UnregisterBackgroundSignalScheduler(); + /** Call any remaining callbacks on the calling thread */ + void FlushBackgroundCallbacks(); + + void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, + bool fInitialDownload); + void TransactionAddedToMempool(const CTransactionRef &); + void BlockConnected(const std::shared_ptr &, + const CBlockIndex *pindex, + const std::vector &); + void BlockDisconnected(const std::shared_ptr &); + void UpdatedTransaction(const uint256 &); + void SetBestChain(const CBlockLocator &); + void Inventory(const uint256 &); + void Broadcast(int64_t nBestBlockTime, CConnman *connman); + void BlockChecked(const CBlock &, const CValidationState &); + void NewPoWValidBlock(const CBlockIndex *, + const std::shared_ptr &); }; CMainSignals &GetMainSignals(); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -4,63 +4,159 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "validationinterface.h" +#include "init.h" +#include "scheduler.h" +#include "sync.h" +#include "util.h" + +#include +#include + +#include + +struct MainSignalsInstance { + boost::signals2::signal + UpdatedBlockTip; + boost::signals2::signal + TransactionAddedToMempool; + boost::signals2::signal &, + const CBlockIndex *pindex, + const std::vector &)> + BlockConnected; + boost::signals2::signal &)> + BlockDisconnected; + boost::signals2::signal SetBestChain; + boost::signals2::signal Inventory; + boost::signals2::signal + Broadcast; + boost::signals2::signal + BlockChecked; + boost::signals2::signal &)> + NewPoWValidBlock; + + // We are not allowed to assume the scheduler only runs in one thread, + // but must ensure all callbacks happen in-order, so we end up creating + // our own queue here :( + SingleThreadedSchedulerClient m_schedulerClient; + + MainSignalsInstance(CScheduler *pscheduler) + : m_schedulerClient(pscheduler) {} +}; static CMainSignals g_signals; +void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler &scheduler) { + assert(!m_internals); + m_internals.reset(new MainSignalsInstance(&scheduler)); +} + +void CMainSignals::UnregisterBackgroundSignalScheduler() { + m_internals.reset(nullptr); +} + +void CMainSignals::FlushBackgroundCallbacks() { + m_internals->m_schedulerClient.EmptyQueue(); +} + CMainSignals &GetMainSignals() { return g_signals; } void RegisterValidationInterface(CValidationInterface *pwalletIn) { - g_signals.UpdatedBlockTip.connect(boost::bind( + g_signals.m_internals->UpdatedBlockTip.connect(boost::bind( &CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3)); - g_signals.TransactionAddedToMempool.connect(boost::bind( + g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind( &CValidationInterface::TransactionAddedToMempool, pwalletIn, _1)); - g_signals.BlockConnected.connect(boost::bind( + g_signals.m_internals->BlockConnected.connect(boost::bind( &CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3)); - g_signals.BlockDisconnected.connect( + g_signals.m_internals->BlockDisconnected.connect( boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1)); - g_signals.SetBestChain.connect( + g_signals.m_internals->SetBestChain.connect( boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1)); - g_signals.Inventory.connect( + g_signals.m_internals->Inventory.connect( boost::bind(&CValidationInterface::Inventory, pwalletIn, _1)); - g_signals.Broadcast.connect(boost::bind( + g_signals.m_internals->Broadcast.connect(boost::bind( &CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2)); - g_signals.BlockChecked.connect( + g_signals.m_internals->BlockChecked.connect( boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2)); - g_signals.NewPoWValidBlock.connect(boost::bind( + g_signals.m_internals->NewPoWValidBlock.connect(boost::bind( &CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); } void UnregisterValidationInterface(CValidationInterface *pwalletIn) { - g_signals.BlockChecked.disconnect( + g_signals.m_internals->BlockChecked.disconnect( boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2)); - g_signals.Broadcast.disconnect(boost::bind( + g_signals.m_internals->Broadcast.disconnect(boost::bind( &CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2)); - g_signals.Inventory.disconnect( + g_signals.m_internals->Inventory.disconnect( boost::bind(&CValidationInterface::Inventory, pwalletIn, _1)); - g_signals.SetBestChain.disconnect( + g_signals.m_internals->SetBestChain.disconnect( boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1)); - g_signals.TransactionAddedToMempool.disconnect(boost::bind( + g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind( &CValidationInterface::TransactionAddedToMempool, pwalletIn, _1)); - g_signals.BlockConnected.disconnect(boost::bind( + g_signals.m_internals->BlockConnected.disconnect(boost::bind( &CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3)); - g_signals.BlockDisconnected.disconnect( + g_signals.m_internals->BlockDisconnected.disconnect( boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1)); - g_signals.UpdatedBlockTip.disconnect(boost::bind( + g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind( &CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3)); - g_signals.NewPoWValidBlock.disconnect(boost::bind( + g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind( &CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); } void UnregisterAllValidationInterfaces() { - g_signals.BlockChecked.disconnect_all_slots(); - g_signals.Broadcast.disconnect_all_slots(); - g_signals.Inventory.disconnect_all_slots(); - g_signals.SetBestChain.disconnect_all_slots(); - g_signals.TransactionAddedToMempool.disconnect_all_slots(); - g_signals.BlockConnected.disconnect_all_slots(); - g_signals.BlockDisconnected.disconnect_all_slots(); - g_signals.UpdatedBlockTip.disconnect_all_slots(); - g_signals.NewPoWValidBlock.disconnect_all_slots(); + g_signals.m_internals->BlockChecked.disconnect_all_slots(); + g_signals.m_internals->Broadcast.disconnect_all_slots(); + g_signals.m_internals->Inventory.disconnect_all_slots(); + g_signals.m_internals->SetBestChain.disconnect_all_slots(); + g_signals.m_internals->TransactionAddedToMempool.disconnect_all_slots(); + g_signals.m_internals->BlockConnected.disconnect_all_slots(); + g_signals.m_internals->BlockDisconnected.disconnect_all_slots(); + g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots(); + g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots(); +} + +void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, + const CBlockIndex *pindexFork, + bool fInitialDownload) { + m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); +} + +void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) { + m_internals->TransactionAddedToMempool(ptx); +} + +void CMainSignals::BlockConnected( + const std::shared_ptr &pblock, const CBlockIndex *pindex, + const std::vector &vtxConflicted) { + m_internals->BlockConnected(pblock, pindex, vtxConflicted); +} + +void CMainSignals::BlockDisconnected( + const std::shared_ptr &pblock) { + m_internals->BlockDisconnected(pblock); +} + +void CMainSignals::SetBestChain(const CBlockLocator &locator) { + m_internals->SetBestChain(locator); +} + +void CMainSignals::Inventory(const uint256 &hash) { + m_internals->Inventory(hash); +} + +void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman *connman) { + m_internals->Broadcast(nBestBlockTime, connman); +} + +void CMainSignals::BlockChecked(const CBlock &block, + const CValidationState &state) { + m_internals->BlockChecked(block, state); +} + +void CMainSignals::NewPoWValidBlock( + const CBlockIndex *pindex, const std::shared_ptr &block) { + m_internals->NewPoWValidBlock(pindex, block); }