diff --git a/src/bench/checkqueue.cpp b/src/bench/checkqueue.cpp index 50735806c..02be77a59 100644 --- a/src/bench/checkqueue.cpp +++ b/src/bench/checkqueue.cpp @@ -1,73 +1,67 @@ // Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include <bench/bench.h> #include <checkqueue.h> #include <key.h> #include <prevector.h> #include <pubkey.h> #include <random.h> #include <util/system.h> -#include <boost/thread/thread.hpp> - #include <vector> static const int MIN_CORES = 2; static const size_t BATCHES = 101; static const size_t BATCH_SIZE = 30; static const int PREVECTOR_SIZE = 28; static const size_t QUEUE_BATCH_SIZE = 128; // This Benchmark tests the CheckQueue with a slightly realistic workload, where // checks all contain a prevector that is indirect 50% of the time and there is // a little bit of work done between calls to Add. static void CCheckQueueSpeedPrevectorJob(benchmark::Bench &bench) { const ECCVerifyHandle verify_handle; ECC_Start(); struct PrevectorJob { prevector<PREVECTOR_SIZE, uint8_t> p; PrevectorJob() {} explicit PrevectorJob(FastRandomContext &insecure_rand) { p.resize(insecure_rand.randrange(PREVECTOR_SIZE * 2)); } bool operator()() { return true; } void swap(PrevectorJob &x) { p.swap(x.p); }; }; CCheckQueue<PrevectorJob> queue{QUEUE_BATCH_SIZE}; - boost::thread_group tg; - for (auto x = 0; x < std::max(MIN_CORES, GetNumCores()); ++x) { - tg.create_thread([&] { queue.Thread(); }); - } + queue.StartWorkerThreads(std::max(MIN_CORES, GetNumCores())); // create all the data once, then submit copies in the benchmark. FastRandomContext insecure_rand(true); std::vector<std::vector<PrevectorJob>> vBatches(BATCHES); for (auto &vChecks : vBatches) { vChecks.reserve(BATCH_SIZE); for (size_t x = 0; x < BATCH_SIZE; ++x) { vChecks.emplace_back(insecure_rand); } } bench.minEpochIterations(10) .batch(BATCH_SIZE * BATCHES) .unit("job") .run([&] { // Make insecure_rand here so that each iteration is identical. CCheckQueueControl<PrevectorJob> control(&queue); std::vector<std::vector<PrevectorJob>> vBatches(BATCHES); for (auto &vChecks : vBatches) { control.Add(vChecks); } // control waits for completion by RAII, but it is done explicitly // here for clarity control.Wait(); }); - tg.interrupt_all(); - tg.join_all(); + queue.StopWorkerThreads(); ECC_Stop(); } BENCHMARK(CCheckQueueSpeedPrevectorJob); diff --git a/src/checkqueue.h b/src/checkqueue.h index b8ab4ba23..97a0e5bca 100644 --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -1,252 +1,249 @@ // Copyright (c) 2012-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_CHECKQUEUE_H #define BITCOIN_CHECKQUEUE_H #include <sync.h> #include <tinyformat.h> #include <util/threadnames.h> #include <algorithm> #include <vector> #include <boost/thread/condition_variable.hpp> #include <boost/thread/mutex.hpp> template <typename T> class CCheckQueueControl; /** * Queue for verifications that have to be performed. * The verifications are represented by a type T, which must provide an * operator(), returning a bool. * * One thread (the master) is assumed to push batches of verifications onto the * queue, where they are processed by N-1 worker threads. When the master is * done adding work, it temporarily joins the worker pool as an N'th worker, * until all jobs are done. */ template <typename T> class CCheckQueue { private: //! Mutex to protect the inner state boost::mutex mutex; //! Worker threads block on this when out of work boost::condition_variable condWorker; //! Master thread blocks on this when out of work boost::condition_variable condMaster; //! The queue of elements to be processed. //! As the order of booleans doesn't matter, it is used as a LIFO (stack) std::vector<T> queue; //! The number of workers (including the master) that are idle. int nIdle{0}; //! The total number of workers (including the master). int nTotal{0}; //! The temporary evaluation result. bool fAllOk{true}; /** * Number of verifications that haven't completed yet. * This includes elements that are no longer queued, but still in the * worker's own batches. */ unsigned int nTodo{0}; //! The maximum number of elements to be processed in one batch const unsigned int nBatchSize; std::vector<std::thread> m_worker_threads; bool m_request_stop{false}; /** Internal function that does bulk of the verification work. */ bool Loop(bool fMaster) { boost::condition_variable &cond = fMaster ? condMaster : condWorker; std::vector<T> vChecks; vChecks.reserve(nBatchSize); unsigned int nNow = 0; bool fOk = true; do { { boost::unique_lock<boost::mutex> lock(mutex); // first do the clean-up of the previous loop run (allowing us // to do it in the same critsect) if (nNow) { fAllOk &= fOk; nTodo -= nNow; if (nTodo == 0 && !fMaster) { // We processed the last element; inform the master it // can exit and return the result condMaster.notify_one(); } } else { // first iteration nTotal++; } // logically, the do loop starts here while (queue.empty() && !m_request_stop) { if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; // reset the status for new work later fAllOk = true; // return the current status return fRet; } nIdle++; cond.wait(lock); // wait nIdle--; } if (m_request_stop) { return false; } // Decide how many work units to process now. // * Do not try to do everything at once, but aim for // increasingly smaller batches so all workers finish // approximately simultaneously. // * Try to account for idle jobs which will instantly start // helping. // * Don't do batches smaller than 1 (duh), or larger than // nBatchSize. nNow = std::max( 1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); vChecks.resize(nNow); for (unsigned int i = 0; i < nNow; i++) { // We want the lock on the mutex to be as short as possible, // so swap jobs from the global queue to the local batch // vector instead of copying. vChecks[i].swap(queue.back()); queue.pop_back(); } // Check whether we need to do work at all fOk = fAllOk; } // execute work for (T &check : vChecks) { if (fOk) { fOk = check(); } } vChecks.clear(); } while (true); } public: //! Mutex to ensure only one concurrent CCheckQueueControl boost::mutex ControlMutex; //! Create a new check queue explicit CCheckQueue(unsigned int nBatchSizeIn) : nBatchSize(nBatchSizeIn) {} //! Create a pool of new worker threads. void StartWorkerThreads(const int threads_num) { { boost::unique_lock<boost::mutex> lock(mutex); nIdle = 0; nTotal = 0; fAllOk = true; } assert(m_worker_threads.empty()); for (int n = 0; n < threads_num; ++n) { m_worker_threads.emplace_back([this, n]() { util::ThreadRename(strprintf("scriptch.%i", n)); Loop(false /* worker thread */); }); } } - //! Worker thread - void Thread() { Loop(false /* worker thread */); } - //! Wait until execution finishes, and return whether all evaluations were //! successful. bool Wait() { return Loop(true /* master thread */); } //! Add a batch of checks to the queue void Add(std::vector<T> &vChecks) { boost::unique_lock<boost::mutex> lock(mutex); for (T &check : vChecks) { queue.push_back(T()); check.swap(queue.back()); } nTodo += vChecks.size(); if (vChecks.size() == 1) { condWorker.notify_one(); } else if (vChecks.size() > 1) { condWorker.notify_all(); } } //! Stop all of the worker threads. void StopWorkerThreads() { { boost::unique_lock<boost::mutex> lock(mutex); m_request_stop = true; } condWorker.notify_all(); for (std::thread &t : m_worker_threads) { t.join(); } m_worker_threads.clear(); boost::unique_lock<boost::mutex> lock(mutex); m_request_stop = false; } ~CCheckQueue() { assert(m_worker_threads.empty()); } }; /** * RAII-style controller object for a CCheckQueue that guarantees the passed * queue is finished before continuing. */ template <typename T> class CCheckQueueControl { private: CCheckQueue<T> *const pqueue; bool fDone; public: CCheckQueueControl() = delete; CCheckQueueControl(const CCheckQueueControl &) = delete; CCheckQueueControl &operator=(const CCheckQueueControl &) = delete; explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn) : pqueue(pqueueIn), fDone(false) { // passed queue is supposed to be unused, or nullptr if (pqueue != nullptr) { ENTER_CRITICAL_SECTION(pqueue->ControlMutex); } } bool Wait() { if (pqueue == nullptr) { return true; } bool fRet = pqueue->Wait(); fDone = true; return fRet; } void Add(std::vector<T> &vChecks) { if (pqueue != nullptr) { pqueue->Add(vChecks); } } ~CCheckQueueControl() { if (!fDone) { Wait(); } if (pqueue != nullptr) { LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); } } }; #endif // BITCOIN_CHECKQUEUE_H