Changeset View
Changeset View
Standalone View
Standalone View
src/checkqueue.h
// Copyright (c) 2012-2015 The Bitcoin Core developers | // Copyright (c) 2012-2018 The Bitcoin Core developers | ||||
// Distributed under the MIT software license, see the accompanying | // Distributed under the MIT software license, see the accompanying | ||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php. | // file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
#ifndef BITCOIN_CHECKQUEUE_H | #ifndef BITCOIN_CHECKQUEUE_H | ||||
#define BITCOIN_CHECKQUEUE_H | #define BITCOIN_CHECKQUEUE_H | ||||
#include <sync.h> | |||||
#include <algorithm> | #include <algorithm> | ||||
#include <vector> | #include <vector> | ||||
#include <boost/thread/condition_variable.hpp> | #include <boost/thread/condition_variable.hpp> | ||||
#include <boost/thread/locks.hpp> | |||||
#include <boost/thread/mutex.hpp> | #include <boost/thread/mutex.hpp> | ||||
template <typename T> class CCheckQueueControl; | template <typename T> class CCheckQueueControl; | ||||
/** | /** | ||||
* Queue for verifications that have to be performed. | * Queue for verifications that have to be performed. | ||||
* The verifications are represented by a type T, which must provide an | * The verifications are represented by a type T, which must provide an | ||||
* operator(), returning a bool. | * operator(), returning a bool. | ||||
Show All 29 Lines | private: | ||||
/** | /** | ||||
* Number of verifications that haven't completed yet. | * Number of verifications that haven't completed yet. | ||||
* This includes elements that are no longer queued, but still in the | * This includes elements that are no longer queued, but still in the | ||||
* worker's own batches. | * worker's own batches. | ||||
*/ | */ | ||||
unsigned int nTodo; | unsigned int nTodo; | ||||
//! Whether we're shutting down. | |||||
bool fQuit; | |||||
//! The maximum number of elements to be processed in one batch | //! The maximum number of elements to be processed in one batch | ||||
unsigned int nBatchSize; | unsigned int nBatchSize; | ||||
/** Internal function that does bulk of the verification work. */ | /** Internal function that does bulk of the verification work. */ | ||||
bool Loop(bool fMaster = false) { | bool Loop(bool fMaster = false) { | ||||
boost::condition_variable &cond = fMaster ? condMaster : condWorker; | boost::condition_variable &cond = fMaster ? condMaster : condWorker; | ||||
std::vector<T> vChecks; | std::vector<T> vChecks; | ||||
vChecks.reserve(nBatchSize); | vChecks.reserve(nBatchSize); | ||||
unsigned int nNow = 0; | unsigned int nNow = 0; | ||||
bool fOk = true; | bool fOk = true; | ||||
do { | do { | ||||
{ | { | ||||
boost::unique_lock<boost::mutex> lock(mutex); | boost::unique_lock<boost::mutex> lock(mutex); | ||||
// first do the clean-up of the previous loop run (allowing us | // first do the clean-up of the previous loop run (allowing us | ||||
// to do it in the same critsect) | // to do it in the same critsect) | ||||
if (nNow) { | if (nNow) { | ||||
fAllOk &= fOk; | fAllOk &= fOk; | ||||
nTodo -= nNow; | nTodo -= nNow; | ||||
if (nTodo == 0 && !fMaster) | if (nTodo == 0 && !fMaster) { | ||||
// We processed the last element; inform the master it | // We processed the last element; inform the master it | ||||
// can exit and return the result | // can exit and return the result | ||||
condMaster.notify_one(); | condMaster.notify_one(); | ||||
} | |||||
} else { | } else { | ||||
// first iteration | // first iteration | ||||
nTotal++; | nTotal++; | ||||
} | } | ||||
// logically, the do loop starts here | // logically, the do loop starts here | ||||
while (queue.empty()) { | while (queue.empty()) { | ||||
if ((fMaster || fQuit) && nTodo == 0) { | if (fMaster && nTodo == 0) { | ||||
nTotal--; | nTotal--; | ||||
bool fRet = fAllOk; | bool fRet = fAllOk; | ||||
// reset the status for new work later | // reset the status for new work later | ||||
if (fMaster) fAllOk = true; | if (fMaster) { | ||||
fAllOk = true; | |||||
} | |||||
// return the current status | // return the current status | ||||
return fRet; | return fRet; | ||||
} | } | ||||
nIdle++; | nIdle++; | ||||
cond.wait(lock); // wait | cond.wait(lock); // wait | ||||
nIdle--; | nIdle--; | ||||
} | } | ||||
// Decide how many work units to process now. | // Decide how many work units to process now. | ||||
Show All 15 Lines | bool Loop(bool fMaster = false) { | ||||
vChecks[i].swap(queue.back()); | vChecks[i].swap(queue.back()); | ||||
queue.pop_back(); | queue.pop_back(); | ||||
} | } | ||||
// Check whether we need to do work at all | // Check whether we need to do work at all | ||||
fOk = fAllOk; | fOk = fAllOk; | ||||
} | } | ||||
// execute work | // execute work | ||||
for (T &check : vChecks) { | for (T &check : vChecks) { | ||||
if (fOk) fOk = check(); | if (fOk) { | ||||
fOk = check(); | |||||
} | |||||
} | } | ||||
vChecks.clear(); | vChecks.clear(); | ||||
} while (true); | } while (true); | ||||
} | } | ||||
public: | public: | ||||
//! Mutex to ensure only one concurrent CCheckQueueControl | |||||
boost::mutex ControlMutex; | |||||
//! Create a new check queue | //! Create a new check queue | ||||
CCheckQueue(unsigned int nBatchSizeIn) | explicit CCheckQueue(unsigned int nBatchSizeIn) | ||||
: nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), | : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), | ||||
nBatchSize(nBatchSizeIn) {} | nBatchSize(nBatchSizeIn) {} | ||||
//! Worker thread | //! Worker thread | ||||
void Thread() { Loop(); } | void Thread() { Loop(); } | ||||
//! Wait until execution finishes, and return whether all evaluations were | //! Wait until execution finishes, and return whether all evaluations were | ||||
//! successful. | //! successful. | ||||
bool Wait() { return Loop(true); } | bool Wait() { return Loop(true); } | ||||
//! Add a batch of checks to the queue | //! Add a batch of checks to the queue | ||||
void Add(std::vector<T> &vChecks) { | void Add(std::vector<T> &vChecks) { | ||||
boost::unique_lock<boost::mutex> lock(mutex); | boost::unique_lock<boost::mutex> lock(mutex); | ||||
for (T &check : vChecks) { | for (T &check : vChecks) { | ||||
queue.push_back(std::move(check)); | queue.push_back(T()); | ||||
check.swap(queue.back()); | |||||
} | } | ||||
nTodo += vChecks.size(); | nTodo += vChecks.size(); | ||||
if (vChecks.size() == 1) { | if (vChecks.size() == 1) { | ||||
condWorker.notify_one(); | condWorker.notify_one(); | ||||
} else if (vChecks.size() > 1) { | } else if (vChecks.size() > 1) { | ||||
condWorker.notify_all(); | condWorker.notify_all(); | ||||
} | } | ||||
} | } | ||||
~CCheckQueue() {} | ~CCheckQueue() {} | ||||
bool IsIdle() { | |||||
boost::unique_lock<boost::mutex> lock(mutex); | |||||
return (nTotal == nIdle && nTodo == 0 && fAllOk == true); | |||||
} | |||||
}; | }; | ||||
/** | /** | ||||
* RAII-style controller object for a CCheckQueue that guarantees the passed | * RAII-style controller object for a CCheckQueue that guarantees the passed | ||||
* queue is finished before continuing. | * queue is finished before continuing. | ||||
*/ | */ | ||||
template <typename T> class CCheckQueueControl { | template <typename T> class CCheckQueueControl { | ||||
private: | private: | ||||
CCheckQueue<T> *pqueue; | CCheckQueue<T> *const pqueue; | ||||
bool fDone; | bool fDone; | ||||
public: | public: | ||||
CCheckQueueControl(CCheckQueue<T> *pqueueIn) | CCheckQueueControl() = delete; | ||||
CCheckQueueControl(const CCheckQueueControl &) = delete; | |||||
CCheckQueueControl &operator=(const CCheckQueueControl &) = delete; | |||||
explicit CCheckQueueControl(CCheckQueue<T> *const pqueueIn) | |||||
: pqueue(pqueueIn), fDone(false) { | : pqueue(pqueueIn), fDone(false) { | ||||
// passed queue is supposed to be unused, or nullptr | // passed queue is supposed to be unused, or nullptr | ||||
if (pqueue != nullptr) { | if (pqueue != nullptr) { | ||||
bool isIdle = pqueue->IsIdle(); | ENTER_CRITICAL_SECTION(pqueue->ControlMutex); | ||||
assert(isIdle); | |||||
} | } | ||||
} | } | ||||
bool Wait() { | bool Wait() { | ||||
if (pqueue == nullptr) return true; | if (pqueue == nullptr) { | ||||
return true; | |||||
} | |||||
bool fRet = pqueue->Wait(); | bool fRet = pqueue->Wait(); | ||||
fDone = true; | fDone = true; | ||||
return fRet; | return fRet; | ||||
} | } | ||||
void Add(std::vector<T> &vChecks) { | void Add(std::vector<T> &vChecks) { | ||||
if (pqueue != nullptr) pqueue->Add(vChecks); | if (pqueue != nullptr) { | ||||
pqueue->Add(vChecks); | |||||
} | |||||
} | } | ||||
~CCheckQueueControl() { | ~CCheckQueueControl() { | ||||
if (!fDone) Wait(); | if (!fDone) { | ||||
Wait(); | |||||
} | |||||
if (pqueue != nullptr) { | |||||
LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); | |||||
} | |||||
} | } | ||||
}; | }; | ||||
#endif // BITCOIN_CHECKQUEUE_H | #endif // BITCOIN_CHECKQUEUE_H |