diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 2b59d1dc5..f34ae7f58 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -1,199 +1,204 @@ // Copyright (c) 2015-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "scheduler.h" #include "random.h" #include "reverselock.h" #include #include #include CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false) {} CScheduler::~CScheduler() { assert(nThreadsServicingQueue == 0); } void CScheduler::serviceQueue() { boost::unique_lock lock(newTaskMutex); ++nThreadsServicingQueue; // newTaskMutex is locked throughout this loop EXCEPT when the thread is // waiting or when the user's function is called. while (!shouldStop()) { try { if (!shouldStop() && taskQueue.empty()) { reverse_lock> rlock(lock); // Use this chance to get a tiny bit more entropy RandAddSeedSleep(); } while (!shouldStop() && taskQueue.empty()) { // Wait until there is something to do. newTaskScheduled.wait(lock); } // Wait until either there is a new task, or until the time of the // first item on the queue. // Some boost versions have a conflicting overload of wait_until // that returns void. Explicitly use a template here to avoid // hitting that overload. while (!shouldStop() && !taskQueue.empty()) { boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout) { // Exit loop after timeout, it means we reached the time of // the event break; } } // If there are multiple threads, the queue can empty while we're // waiting (another thread may service the task we were waiting on). if (shouldStop() || taskQueue.empty()) { continue; } Function f = taskQueue.begin()->second; taskQueue.erase(taskQueue.begin()); { // Unlock before calling f, so it can reschedule itself or // another task without deadlocking: reverse_lock> rlock(lock); f(); } } catch (...) { --nThreadsServicingQueue; throw; } } --nThreadsServicingQueue; newTaskScheduled.notify_one(); } void CScheduler::stop(bool drain) { { boost::unique_lock lock(newTaskMutex); if (drain) { stopWhenEmpty = true; } else { stopRequested = true; } } newTaskScheduled.notify_all(); } void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) { { boost::unique_lock lock(newTaskMutex); taskQueue.insert(std::make_pair(t, f)); } newTaskScheduled.notify_one(); } void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) { schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds)); } static void Repeat(CScheduler *s, CScheduler::Predicate p, int64_t deltaMilliSeconds) { if (p()) { s->scheduleFromNow(boost::bind(&Repeat, s, p, deltaMilliSeconds), deltaMilliSeconds); } } void CScheduler::scheduleEvery(CScheduler::Predicate p, int64_t deltaMilliSeconds) { scheduleFromNow(boost::bind(&Repeat, this, p, deltaMilliSeconds), deltaMilliSeconds); } size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const { boost::unique_lock lock(newTaskMutex); size_t result = taskQueue.size(); if (!taskQueue.empty()) { first = taskQueue.begin()->first; last = taskQueue.rbegin()->first; } return result; } bool CScheduler::AreThreadsServicingQueue() const { return nThreadsServicingQueue; } void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { { LOCK(m_cs_callbacks_pending); // Try to avoid scheduling too many copies here, but if we // accidentally have two ProcessQueue's scheduled at once its // not a big deal. if (m_are_callbacks_running) return; if (m_callbacks_pending.empty()) return; } m_pscheduler->schedule( std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); } void SingleThreadedSchedulerClient::ProcessQueue() { std::function callback; { LOCK(m_cs_callbacks_pending); if (m_are_callbacks_running) return; if (m_callbacks_pending.empty()) return; m_are_callbacks_running = true; callback = std::move(m_callbacks_pending.front()); m_callbacks_pending.pop_front(); } // RAII the setting of fCallbacksRunning and calling // MaybeScheduleProcessQueue // to ensure both happen safely even if callback() throws. struct RAIICallbacksRunning { SingleThreadedSchedulerClient *instance; explicit RAIICallbacksRunning(SingleThreadedSchedulerClient *_instance) : instance(_instance) {} ~RAIICallbacksRunning() { { LOCK(instance->m_cs_callbacks_pending); instance->m_are_callbacks_running = false; } instance->MaybeScheduleProcessQueue(); } } raiicallbacksrunning(this); callback(); } void SingleThreadedSchedulerClient::AddToProcessQueue( std::function func) { assert(m_pscheduler); { LOCK(m_cs_callbacks_pending); m_callbacks_pending.emplace_back(std::move(func)); } MaybeScheduleProcessQueue(); } void SingleThreadedSchedulerClient::EmptyQueue() { assert(!m_pscheduler->AreThreadsServicingQueue()); bool should_continue = true; while (should_continue) { ProcessQueue(); LOCK(m_cs_callbacks_pending); should_continue = !m_callbacks_pending.empty(); } } + +size_t SingleThreadedSchedulerClient::CallbacksPending() { + LOCK(m_cs_callbacks_pending); + return m_callbacks_pending.size(); +} diff --git a/src/scheduler.h b/src/scheduler.h index 883afafe9..86700c92d 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,119 +1,121 @@ // Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_SCHEDULER_H #define BITCOIN_SCHEDULER_H #include "sync.h" // // NOTE: // boost::thread / boost::chrono should be ported to // std::thread / std::chrono when we support C++11. // #include #include #include // // Simple class for background tasks that should be run periodically or once // "after a while" // // Usage: // // CScheduler* s = new CScheduler(); // s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); // boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, // s)); // // ... then at program shutdown, clean up the thread running serviceQueue: // t->interrupt(); // t->join(); // delete t; // delete s; // Must be done after thread is interrupted/joined. // class CScheduler { public: CScheduler(); ~CScheduler(); typedef std::function Function; typedef std::function Predicate; // Call func at/after time t void schedule(Function f, boost::chrono::system_clock::time_point t = boost::chrono::system_clock::now()); // Convenience method: call f once deltaMilliSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); // Another convenience method: call f approximately every deltaMilliSeconds // forever, starting deltaMilliSeconds from now. To be more precise: every // time f is finished, it is rescheduled to run deltaMilliSeconds later. If // you need more accurate scheduling, don't use this method. void scheduleEvery(Predicate p, int64_t deltaMilliSeconds); // To keep things as simple as possible, there is no unschedule. // Services the queue 'forever'. Should be run in a thread, and interrupted // using boost::interrupt_thread void serviceQueue(); // Tell any threads running serviceQueue to stop as soon as they're done // servicing whatever task they're currently servicing (drain=false) or when // there is no work left to be done (drain=true) void stop(bool drain = false); // Returns number of tasks waiting to be serviced, and first and last task // times size_t getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const; // Returns true if there are threads actively running in serviceQueue() bool AreThreadsServicingQueue() const; private: std::multimap taskQueue; boost::condition_variable newTaskScheduled; mutable boost::mutex newTaskMutex; int nThreadsServicingQueue; bool stopRequested; bool stopWhenEmpty; bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; /** * Class used by CScheduler clients which may schedule multiple jobs * which are required to be run serially. Does not require such jobs * to be executed on the same thread, but no two jobs will be executed * at the same time. */ class SingleThreadedSchedulerClient { private: CScheduler *m_pscheduler; CCriticalSection m_cs_callbacks_pending; std::list> m_callbacks_pending; bool m_are_callbacks_running = false; void MaybeScheduleProcessQueue(); void ProcessQueue(); public: explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} void AddToProcessQueue(std::function func); // Processes all remaining queue members on the calling thread, blocking // until queue is empty // Must be called after the CScheduler has no remaining processing threads! void EmptyQueue(); + + size_t CallbacksPending(); }; #endif diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index bf35a806c..12dafb416 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -1,198 +1,205 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "validationinterface.h" #include "init.h" #include "scheduler.h" #include "sync.h" #include "txmempool.h" #include "util.h" #include #include #include struct MainSignalsInstance { boost::signals2::signal UpdatedBlockTip; boost::signals2::signal TransactionAddedToMempool; boost::signals2::signal &, const CBlockIndex *pindex, const std::vector &)> BlockConnected; boost::signals2::signal &)> BlockDisconnected; boost::signals2::signal TransactionRemovedFromMempool; boost::signals2::signal SetBestChain; boost::signals2::signal Inventory; boost::signals2::signal Broadcast; boost::signals2::signal BlockChecked; boost::signals2::signal &)> NewPoWValidBlock; // We are not allowed to assume the scheduler only runs in one thread, // but must ensure all callbacks happen in-order, so we end up creating // our own queue here :( SingleThreadedSchedulerClient m_schedulerClient; explicit MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {} }; static CMainSignals g_signals; void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler &scheduler) { assert(!m_internals); m_internals.reset(new MainSignalsInstance(&scheduler)); } void CMainSignals::UnregisterBackgroundSignalScheduler() { m_internals.reset(nullptr); } void CMainSignals::FlushBackgroundCallbacks() { if (m_internals) { m_internals->m_schedulerClient.EmptyQueue(); } } +size_t CMainSignals::CallbacksPending() { + if (!m_internals) { + return 0; + } + return m_internals->m_schedulerClient.CallbacksPending(); +} + void CMainSignals::RegisterWithMempoolSignals(CTxMemPool &pool) { pool.NotifyEntryRemoved.connect( boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2)); } void CMainSignals::UnregisterWithMempoolSignals(CTxMemPool &pool) { pool.NotifyEntryRemoved.disconnect( boost::bind(&CMainSignals::MempoolEntryRemoved, this, _1, _2)); } CMainSignals &GetMainSignals() { return g_signals; } void RegisterValidationInterface(CValidationInterface *pwalletIn) { g_signals.m_internals->UpdatedBlockTip.connect(boost::bind( &CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3)); g_signals.m_internals->TransactionAddedToMempool.connect(boost::bind( &CValidationInterface::TransactionAddedToMempool, pwalletIn, _1)); g_signals.m_internals->BlockConnected.connect(boost::bind( &CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3)); g_signals.m_internals->BlockDisconnected.connect( boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1)); g_signals.m_internals->TransactionRemovedFromMempool.connect(boost::bind( &CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1)); g_signals.m_internals->SetBestChain.connect( boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1)); g_signals.m_internals->Inventory.connect( boost::bind(&CValidationInterface::Inventory, pwalletIn, _1)); g_signals.m_internals->Broadcast.connect(boost::bind( &CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2)); g_signals.m_internals->BlockChecked.connect( boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2)); g_signals.m_internals->NewPoWValidBlock.connect(boost::bind( &CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); } void UnregisterValidationInterface(CValidationInterface *pwalletIn) { g_signals.m_internals->BlockChecked.disconnect( boost::bind(&CValidationInterface::BlockChecked, pwalletIn, _1, _2)); g_signals.m_internals->Broadcast.disconnect(boost::bind( &CValidationInterface::ResendWalletTransactions, pwalletIn, _1, _2)); g_signals.m_internals->Inventory.disconnect( boost::bind(&CValidationInterface::Inventory, pwalletIn, _1)); g_signals.m_internals->SetBestChain.disconnect( boost::bind(&CValidationInterface::SetBestChain, pwalletIn, _1)); g_signals.m_internals->TransactionAddedToMempool.disconnect(boost::bind( &CValidationInterface::TransactionAddedToMempool, pwalletIn, _1)); g_signals.m_internals->BlockConnected.disconnect(boost::bind( &CValidationInterface::BlockConnected, pwalletIn, _1, _2, _3)); g_signals.m_internals->BlockDisconnected.disconnect( boost::bind(&CValidationInterface::BlockDisconnected, pwalletIn, _1)); g_signals.m_internals->TransactionRemovedFromMempool.disconnect(boost::bind( &CValidationInterface::TransactionRemovedFromMempool, pwalletIn, _1)); g_signals.m_internals->UpdatedBlockTip.disconnect(boost::bind( &CValidationInterface::UpdatedBlockTip, pwalletIn, _1, _2, _3)); g_signals.m_internals->NewPoWValidBlock.disconnect(boost::bind( &CValidationInterface::NewPoWValidBlock, pwalletIn, _1, _2)); } void UnregisterAllValidationInterfaces() { if (!g_signals.m_internals) { return; } g_signals.m_internals->BlockChecked.disconnect_all_slots(); g_signals.m_internals->Broadcast.disconnect_all_slots(); g_signals.m_internals->Inventory.disconnect_all_slots(); g_signals.m_internals->SetBestChain.disconnect_all_slots(); g_signals.m_internals->TransactionAddedToMempool.disconnect_all_slots(); g_signals.m_internals->BlockConnected.disconnect_all_slots(); g_signals.m_internals->BlockDisconnected.disconnect_all_slots(); g_signals.m_internals->TransactionRemovedFromMempool.disconnect_all_slots(); g_signals.m_internals->UpdatedBlockTip.disconnect_all_slots(); g_signals.m_internals->NewPoWValidBlock.disconnect_all_slots(); } void CallFunctionInValidationInterfaceQueue(std::function func) { g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func)); } void CMainSignals::MempoolEntryRemoved(CTransactionRef ptx, MemPoolRemovalReason reason) { if (reason != MemPoolRemovalReason::BLOCK && reason != MemPoolRemovalReason::CONFLICT) { m_internals->m_schedulerClient.AddToProcessQueue( [ptx, this] { m_internals->TransactionRemovedFromMempool(ptx); }); } } void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { m_internals->UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); } void CMainSignals::TransactionAddedToMempool(const CTransactionRef &ptx) { m_internals->TransactionAddedToMempool(ptx); } void CMainSignals::BlockConnected( const std::shared_ptr &pblock, const CBlockIndex *pindex, const std::vector &vtxConflicted) { m_internals->BlockConnected(pblock, pindex, vtxConflicted); } void CMainSignals::BlockDisconnected( const std::shared_ptr &pblock) { m_internals->BlockDisconnected(pblock); } void CMainSignals::SetBestChain(const CBlockLocator &locator) { m_internals->SetBestChain(locator); } void CMainSignals::Inventory(const uint256 &hash) { m_internals->Inventory(hash); } void CMainSignals::Broadcast(int64_t nBestBlockTime, CConnman *connman) { m_internals->Broadcast(nBestBlockTime, connman); } void CMainSignals::BlockChecked(const CBlock &block, const CValidationState &state) { m_internals->BlockChecked(block, state); } void CMainSignals::NewPoWValidBlock( const CBlockIndex *pindex, const std::shared_ptr &block) { m_internals->NewPoWValidBlock(pindex, block); } diff --git a/src/validationinterface.h b/src/validationinterface.h index 5e55e84fe..e91a7dc86 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -1,165 +1,167 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_VALIDATIONINTERFACE_H #define BITCOIN_VALIDATIONINTERFACE_H #include "primitives/transaction.h" // CTransaction(Ref) #include #include class CBlock; class CBlockIndex; struct CBlockLocator; class CBlockIndex; class CConnman; class CReserveScript; class CValidationInterface; class CValidationState; class uint256; class CScheduler; class CTxMemPool; enum class MemPoolRemovalReason; // These functions dispatch to one or all registered wallets /** Register a wallet to receive updates from core */ void RegisterValidationInterface(CValidationInterface *pwalletIn); /** Unregister a wallet from core */ void UnregisterValidationInterface(CValidationInterface *pwalletIn); /** Unregister all wallets from core */ void UnregisterAllValidationInterfaces(); /** * Pushes a function to callback onto the notification queue, guaranteeing any * callbacks generated prior to now are finished when the function is called. * * Be very careful blocking on func to be called if any locks are held - * validation interface clients may not be able to make progress as they often * wait for things like cs_main, so blocking until func is called with cs_main * will result in a deadlock (that DEBUG_LOCKORDER will miss). */ void CallFunctionInValidationInterfaceQueue(std::function func); class CValidationInterface { protected: /** * Protected destructor so that instances can only be deleted by derived * classes. If that restriction is no longer desired, this should be made * public and virtual. */ ~CValidationInterface() = default; /** * Notifies listeners of updated block chain tip * * Called on a background thread. */ virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {} /** * Notifies listeners of a transaction having been added to mempool. * * Called on a background thread. */ virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {} /** * Notifies listeners of a transaction leaving mempool. * * This only fires for transactions which leave mempool because of expiry, * size limiting, reorg (changes in lock times/coinbase maturity), or * replacement. This does not include any transactions which are included * in BlockConnectedDisconnected either in block->vtx or in txnConflicted. * * Called on a background thread. */ virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {} /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. */ virtual void BlockConnected(const std::shared_ptr &block, const CBlockIndex *pindex, const std::vector &txnConflicted) {} /** Notifies listeners of a block being disconnected */ virtual void BlockDisconnected(const std::shared_ptr &block) { } /** Notifies listeners of the new active block chain on-disk. */ virtual void SetBestChain(const CBlockLocator &locator) {} /** Notifies listeners about an inventory item being seen on the network. */ virtual void Inventory(const uint256 &hash) {} /** Tells listeners to broadcast their data. */ virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman *connman) {} /** * Notifies listeners of a block validation result. * If the provided CValidationState IsValid, the provided block * is guaranteed to be the current best block at the time the * callback was generated (not necessarily now) */ virtual void BlockChecked(const CBlock &, const CValidationState &) {} /** * Notifies listeners that a block which builds directly on our current tip * has been received and connected to the headers tree, though not validated * yet. */ virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &block){}; friend void ::RegisterValidationInterface(CValidationInterface *); friend void ::UnregisterValidationInterface(CValidationInterface *); friend void ::UnregisterAllValidationInterfaces(); }; struct MainSignalsInstance; class CMainSignals { private: std::unique_ptr m_internals; friend void ::RegisterValidationInterface(CValidationInterface *); friend void ::UnregisterValidationInterface(CValidationInterface *); friend void ::UnregisterAllValidationInterfaces(); friend void ::CallFunctionInValidationInterfaceQueue( std::function func); void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason); public: /** * Register a CScheduler to give callbacks which should run in the * background (may only be called once) */ void RegisterBackgroundSignalScheduler(CScheduler &scheduler); /** * Unregister a CScheduler to give callbacks which should run in the * background - these callbacks will now be dropped! */ void UnregisterBackgroundSignalScheduler(); /** Call any remaining callbacks on the calling thread */ void FlushBackgroundCallbacks(); + size_t CallbacksPending(); + /** Register with mempool to call TransactionRemovedFromMempool callbacks */ void RegisterWithMempoolSignals(CTxMemPool &pool); /** Unregister with mempool */ void UnregisterWithMempoolSignals(CTxMemPool &pool); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void TransactionAddedToMempool(const CTransactionRef &); void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex, const std::vector &); void BlockDisconnected(const std::shared_ptr &); void SetBestChain(const CBlockLocator &); void Inventory(const uint256 &); void Broadcast(int64_t nBestBlockTime, CConnman *connman); void BlockChecked(const CBlock &, const CValidationState &); void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr &); }; CMainSignals &GetMainSignals(); #endif // BITCOIN_VALIDATIONINTERFACE_H