diff --git a/src/scheduler.h b/src/scheduler.h index a8165559e4..f5fc25ec04 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -1,122 +1,133 @@ // Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_SCHEDULER_H #define BITCOIN_SCHEDULER_H #include // // NOTE: // boost::thread / boost::chrono should be ported to // std::thread / std::chrono when we support C++11. // #include #include #include // // Simple class for background tasks that should be run periodically or once // "after a while" // // Usage: // // CScheduler* s = new CScheduler(); // s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { } // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); // // ... then at program shutdown, clean up the thread running serviceQueue: // t->interrupt(); // t->join(); // delete t; // delete s; // Must be done after thread is interrupted/joined. // class CScheduler { public: CScheduler(); ~CScheduler(); typedef std::function Function; typedef std::function Predicate; // Call func at/after time t void schedule(Function f, boost::chrono::system_clock::time_point t = boost::chrono::system_clock::now()); // Convenience method: call f once deltaMilliSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); // Another convenience method: call p approximately every deltaMilliSeconds // forever, starting deltaMilliSeconds from now untill p returns false. To // be more precise: every time p is finished, it is rescheduled to run // deltaMilliSeconds later. If you need more accurate scheduling, don't use // this method. void scheduleEvery(Predicate p, int64_t deltaMilliSeconds); // To keep things as simple as possible, there is no unschedule. // Services the queue 'forever'. Should be run in a thread, and interrupted // using boost::interrupt_thread void serviceQueue(); // Tell any threads running serviceQueue to stop as soon as they're done // servicing whatever task they're currently servicing (drain=false) or when // there is no work left to be done (drain=true) void stop(bool drain = false); // Returns number of tasks waiting to be serviced, and first and last task // times size_t getQueueInfo(boost::chrono::system_clock::time_point &first, boost::chrono::system_clock::time_point &last) const; // Returns true if there are threads actively running in serviceQueue() bool AreThreadsServicingQueue() const; private: std::multimap taskQueue; boost::condition_variable newTaskScheduled; mutable boost::mutex newTaskMutex; int nThreadsServicingQueue; bool stopRequested; bool stopWhenEmpty; bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; /** * Class used by CScheduler clients which may schedule multiple jobs - * which are required to be run serially. Does not require such jobs - * to be executed on the same thread, but no two jobs will be executed - * at the same time. + * which are required to be run serially. Jobs may not be run on the + * same thread, but no two jobs will be executed + * at the same time and memory will be release-acquire consistent + * (the scheduler will internally do an acquire before invoking a callback + * as well as a release at the end). In practice this means that a callback + * B() will be able to observe all of the effects of callback A() which executed + * before it. */ class SingleThreadedSchedulerClient { private: CScheduler *m_pscheduler; CCriticalSection m_cs_callbacks_pending; std::list> m_callbacks_pending GUARDED_BY(m_cs_callbacks_pending); bool m_are_callbacks_running GUARDED_BY(m_cs_callbacks_pending) = false; void MaybeScheduleProcessQueue(); void ProcessQueue(); public: explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {} + + /** + * Add a callback to be executed. Callbacks are executed serially + * and memory is sequentially consistent between callback executions. + * Practially, this means that callbacks can behave as if they are executed + * in order by a single thread. + */ void AddToProcessQueue(std::function func); // Processes all remaining queue members on the calling thread, blocking // until queue is empty. Must be called after the CScheduler has no // remaining processing threads! void EmptyQueue(); size_t CallbacksPending(); }; #endif diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index 3ca90bbc60..e7882d952c 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -1,177 +1,220 @@ // Copyright (c) 2012-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include BOOST_AUTO_TEST_SUITE(scheduler_tests) static void microTask(CScheduler &s, boost::mutex &mutex, int &counter, int delta, boost::chrono::system_clock::time_point rescheduleTime) { { boost::unique_lock lock(mutex); counter += delta; } boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min(); if (rescheduleTime != noTime) { CScheduler::Function f = boost::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime); s.schedule(f, rescheduleTime); } } static void MicroSleep(uint64_t n) { boost::this_thread::sleep_for(boost::chrono::microseconds(n)); } BOOST_AUTO_TEST_CASE(manythreads) { // Stress test: hundreds of microsecond-scheduled tasks, // serviced by 10 threads. // // So... ten shared counters, which if all the tasks execute // properly will sum to the number of tasks done. // Each task adds or subtracts a random amount from one of the // counters, and then schedules another task 0-1000 // microseconds in the future to subtract or add from // the counter -random_amount+1, so in the end the shared // counters should sum to the number of initial tasks performed. CScheduler microTasks; boost::mutex counterMutex[10]; int counter[10] = {0}; boost::random::mt19937 rng(42); boost::random::uniform_int_distribution<> zeroToNine(0, 9); boost::random::uniform_int_distribution<> randomMsec(-11, 1000); boost::random::uniform_int_distribution<> randomDelta(-1000, 1000); boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now(); boost::chrono::system_clock::time_point now = start; boost::chrono::system_clock::time_point first, last; size_t nTasks = microTasks.getQueueInfo(first, last); BOOST_CHECK(nTasks == 0); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 100; ++i) { boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = boost::bind( µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), randomDelta(rng), tReschedule); microTasks.schedule(f, t); } nTasks = microTasks.getQueueInfo(first, last); BOOST_CHECK(nTasks == 100); BOOST_CHECK(first < last); BOOST_CHECK(last > now); // As soon as these are created they will start running and servicing the // queue boost::thread_group microThreads; for (int i = 0; i < 5; i++) { microThreads.create_thread( boost::bind(&CScheduler::serviceQueue, µTasks)); } MicroSleep(600); now = boost::chrono::system_clock::now(); // More threads and more tasks: for (int i = 0; i < 5; i++) { microThreads.create_thread( boost::bind(&CScheduler::serviceQueue, µTasks)); } for (int i = 0; i < 100; i++) { boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = boost::bind( µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), randomDelta(rng), tReschedule); microTasks.schedule(f, t); } // Drain the task queue then exit threads microTasks.stop(true); // ... wait until all the threads are done microThreads.join_all(); int counterSum = 0; for (int i = 0; i < 10; i++) { BOOST_CHECK(counter[i] != 0); counterSum += counter[i]; } BOOST_CHECK_EQUAL(counterSum, 200); } BOOST_AUTO_TEST_CASE(schedule_every) { CScheduler scheduler; boost::condition_variable cvar; std::atomic counter{15}; std::atomic keepRunning{true}; scheduler.scheduleEvery( [&keepRunning, &cvar, &counter, &scheduler]() { BOOST_CHECK(counter > 0); cvar.notify_all(); if (--counter > 0) { return true; } // We reached the end of our test, make sure nothing run again for // 100ms. scheduler.scheduleFromNow( [&keepRunning, &cvar]() { keepRunning = false; cvar.notify_all(); }, 100); // We set the counter to some magic value to check the scheduler // empty its queue properly after 120ms. scheduler.scheduleFromNow([&counter]() { counter = 42; }, 120); return false; }, 5); // Start the scheduler thread. std::thread schedulerThread( std::bind(&CScheduler::serviceQueue, &scheduler)); boost::mutex mutex; boost::unique_lock lock(mutex); while (keepRunning) { cvar.wait(lock); BOOST_CHECK(counter >= 0); } BOOST_CHECK_EQUAL(counter, 0); scheduler.stop(true); schedulerThread.join(); BOOST_CHECK_EQUAL(counter, 42); } +BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) { + CScheduler scheduler; + + // each queue should be well ordered with respect to itself but not other + // queues + SingleThreadedSchedulerClient queue1(&scheduler); + SingleThreadedSchedulerClient queue2(&scheduler); + + // create more threads than queues + // if the queues only permit execution of one task at once then + // the extra threads should effectively be doing nothing + // if they don't we'll get out of order behaviour + boost::thread_group threads; + for (int i = 0; i < 5; ++i) { + threads.create_thread( + boost::bind(&CScheduler::serviceQueue, &scheduler)); + } + + // these are not atomic, if SinglethreadedSchedulerClient prevents + // parallel execution at the queue level no synchronization should be + // required here + int counter1 = 0; + int counter2 = 0; + + // just simply count up on each queue - if execution is properly ordered + // then the callbacks should run in exactly the order in which they were + // enqueued + for (int i = 0; i < 100; ++i) { + queue1.AddToProcessQueue( + [i, &counter1]() { BOOST_CHECK_EQUAL(i, counter1++); }); + + queue2.AddToProcessQueue( + [i, &counter2]() { BOOST_CHECK_EQUAL(i, counter2++); }); + } + + // finish up + scheduler.stop(true); + threads.join_all(); + + BOOST_CHECK_EQUAL(counter1, 100); + BOOST_CHECK_EQUAL(counter2, 100); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/validationinterface.h b/src/validationinterface.h index dd560178b6..3d3f086806 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -1,192 +1,207 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2016 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_VALIDATIONINTERFACE_H #define BITCOIN_VALIDATIONINTERFACE_H #include // CTransaction(Ref) #include #include class CBlock; class CBlockIndex; struct CBlockLocator; class CBlockIndex; class CConnman; class CReserveScript; class CValidationInterface; class CValidationState; class uint256; class CScheduler; class CTxMemPool; enum class MemPoolRemovalReason; // These functions dispatch to one or all registered wallets /** Register a wallet to receive updates from core */ void RegisterValidationInterface(CValidationInterface *pwalletIn); /** Unregister a wallet from core */ void UnregisterValidationInterface(CValidationInterface *pwalletIn); /** Unregister all wallets from core */ void UnregisterAllValidationInterfaces(); /** * Pushes a function to callback onto the notification queue, guaranteeing any * callbacks generated prior to now are finished when the function is called. * * Be very careful blocking on func to be called if any locks are held - * validation interface clients may not be able to make progress as they often * wait for things like cs_main, so blocking until func is called with cs_main * will result in a deadlock (that DEBUG_LOCKORDER will miss). */ void CallFunctionInValidationInterfaceQueue(std::function func); /** * This is a synonym for the following, which asserts certain locks are not * held: * std::promise promise; * CallFunctionInValidationInterfaceQueue([&promise] { * promise.set_value(); * }); * promise.get_future().wait(); */ void SyncWithValidationInterfaceQueue(); +/** + * Implement this to subscribe to events generated in validation + * + * Each CValidationInterface() subscriber will receive event callbacks + * in the order in which the events were generated by validation. + * Furthermore, each ValidationInterface() subscriber may assume that + * callbacks effectively run in a single thread with single-threaded + * memory consistency. That is, for a given ValidationInterface() + * instantiation, each callback will complete before the next one is + * invoked. This means, for example when a block is connected that the + * UpdatedBlockTip() callback may depend on an operation performed in + * the BlockConnected() callback without worrying about explicit + * synchronization. No ordering should be assumed across + * ValidationInterface() subscribers. + */ class CValidationInterface { protected: /** * Protected destructor so that instances can only be deleted by derived * classes. If that restriction is no longer desired, this should be made * public and virtual. */ ~CValidationInterface() = default; /** * Notifies listeners of updated block chain tip * * Called on a background thread. */ virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {} /** * Notifies listeners of a transaction having been added to mempool. * * Called on a background thread. */ virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {} /** * Notifies listeners of a transaction leaving mempool. * * This only fires for transactions which leave mempool because of expiry, * size limiting, reorg (changes in lock times/coinbase maturity), or * replacement. This does not include any transactions which are included * in BlockConnectedDisconnected either in block->vtx or in txnConflicted. * * Called on a background thread. */ virtual void TransactionRemovedFromMempool(const CTransactionRef &ptx) {} /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. */ virtual void BlockConnected(const std::shared_ptr &block, const CBlockIndex *pindex, const std::vector &txnConflicted) {} /** Notifies listeners of a block being disconnected */ virtual void BlockDisconnected(const std::shared_ptr &block) { } /** * Notifies listeners of the new active block chain on-disk. * * Prior to this callback, any updates are not guaranteed to persist on disk * (ie clients need to handle shutdown/restart safety by being able to * understand when some updates were lost due to unclean shutdown). * * When this callback is invoked, the validation changes done by any prior * callback are guaranteed to exist on disk and survive a restart, including * an unclean shutdown. * * Provides a locator describing the best chain, which is likely useful for * storing current state on disk in client DBs. * * Called on a background thread. */ virtual void ChainStateFlushed(const CBlockLocator &locator) {} /** Notifies listeners about an inventory item being seen on the network. */ virtual void Inventory(const uint256 &hash) {} /** Tells listeners to broadcast their data. */ virtual void ResendWalletTransactions(int64_t nBestBlockTime, CConnman *connman) {} /** * Notifies listeners of a block validation result. * If the provided CValidationState IsValid, the provided block * is guaranteed to be the current best block at the time the * callback was generated (not necessarily now) */ virtual void BlockChecked(const CBlock &, const CValidationState &) {} /** * Notifies listeners that a block which builds directly on our current tip * has been received and connected to the headers tree, though not validated * yet. */ virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &block){}; friend void ::RegisterValidationInterface(CValidationInterface *); friend void ::UnregisterValidationInterface(CValidationInterface *); friend void ::UnregisterAllValidationInterfaces(); }; struct MainSignalsInstance; class CMainSignals { private: std::unique_ptr m_internals; friend void ::RegisterValidationInterface(CValidationInterface *); friend void ::UnregisterValidationInterface(CValidationInterface *); friend void ::UnregisterAllValidationInterfaces(); friend void ::CallFunctionInValidationInterfaceQueue( std::function func); void MempoolEntryRemoved(CTransactionRef tx, MemPoolRemovalReason reason); public: /** * Register a CScheduler to give callbacks which should run in the * background (may only be called once) */ void RegisterBackgroundSignalScheduler(CScheduler &scheduler); /** * Unregister a CScheduler to give callbacks which should run in the * background - these callbacks will now be dropped! */ void UnregisterBackgroundSignalScheduler(); /** Call any remaining callbacks on the calling thread */ void FlushBackgroundCallbacks(); size_t CallbacksPending(); /** Register with mempool to call TransactionRemovedFromMempool callbacks */ void RegisterWithMempoolSignals(CTxMemPool &pool); /** Unregister with mempool */ void UnregisterWithMempoolSignals(CTxMemPool &pool); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void TransactionAddedToMempool(const CTransactionRef &); void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex, const std::vector &); void BlockDisconnected(const std::shared_ptr &); void ChainStateFlushed(const CBlockLocator &); void Inventory(const uint256 &); void Broadcast(int64_t nBestBlockTime, CConnman *connman); void BlockChecked(const CBlock &, const CValidationState &); void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr &); }; CMainSignals &GetMainSignals(); #endif // BITCOIN_VALIDATIONINTERFACE_H