diff --git a/src/Makefile.test.include b/src/Makefile.test.include --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -44,6 +44,7 @@ test/cashaddrenc_tests.cpp \ test/checkdatasig_tests.cpp \ test/checkpoints_tests.cpp \ + test/checkqueue_tests.cpp \ test/coins_tests.cpp \ test/compress_tests.cpp \ test/config_tests.cpp \ diff --git a/src/checkqueue.h b/src/checkqueue.h --- a/src/checkqueue.h +++ b/src/checkqueue.h @@ -1,15 +1,16 @@ -// Copyright (c) 2012-2015 The Bitcoin Core developers +// Copyright (c) 2012-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_CHECKQUEUE_H #define BITCOIN_CHECKQUEUE_H +#include + #include #include #include -#include #include template class CCheckQueueControl; @@ -55,9 +56,6 @@ */ unsigned int nTodo; - //! Whether we're shutting down. - bool fQuit; - //! The maximum number of elements to be processed in one batch unsigned int nBatchSize; @@ -76,21 +74,24 @@ if (nNow) { fAllOk &= fOk; nTodo -= nNow; - if (nTodo == 0 && !fMaster) + if (nTodo == 0 && !fMaster) { // We processed the last element; inform the master it // can exit and return the result condMaster.notify_one(); + } } else { // first iteration nTotal++; } // logically, the do loop starts here while (queue.empty()) { - if ((fMaster || fQuit) && nTodo == 0) { + if (fMaster && nTodo == 0) { nTotal--; bool fRet = fAllOk; // reset the status for new work later - if (fMaster) fAllOk = true; + if (fMaster) { + fAllOk = true; + } // return the current status return fRet; } @@ -122,16 +123,21 @@ } // execute work for (T &check : vChecks) { - if (fOk) fOk = check(); + if (fOk) { + fOk = check(); + } } vChecks.clear(); } while (true); } public: + //! Mutex to ensure only one concurrent CCheckQueueControl + boost::mutex ControlMutex; + //! Create a new check queue - CCheckQueue(unsigned int nBatchSizeIn) - : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), + explicit CCheckQueue(unsigned int nBatchSizeIn) + : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), nBatchSize(nBatchSizeIn) {} //! Worker thread @@ -145,7 +151,8 @@ void Add(std::vector &vChecks) { boost::unique_lock lock(mutex); for (T &check : vChecks) { - queue.push_back(std::move(check)); + queue.push_back(T()); + check.swap(queue.back()); } nTodo += vChecks.size(); if (vChecks.size() == 1) { @@ -156,11 +163,6 @@ } ~CCheckQueue() {} - - bool IsIdle() { - boost::unique_lock lock(mutex); - return (nTotal == nIdle && nTodo == 0 && fAllOk == true); - } }; /** @@ -169,32 +171,43 @@ */ template class CCheckQueueControl { private: - CCheckQueue *pqueue; + CCheckQueue *const pqueue; bool fDone; public: - CCheckQueueControl(CCheckQueue *pqueueIn) + CCheckQueueControl() = delete; + CCheckQueueControl(const CCheckQueueControl &) = delete; + CCheckQueueControl &operator=(const CCheckQueueControl &) = delete; + explicit CCheckQueueControl(CCheckQueue *const pqueueIn) : pqueue(pqueueIn), fDone(false) { // passed queue is supposed to be unused, or nullptr if (pqueue != nullptr) { - bool isIdle = pqueue->IsIdle(); - assert(isIdle); + ENTER_CRITICAL_SECTION(pqueue->ControlMutex); } } bool Wait() { - if (pqueue == nullptr) return true; + if (pqueue == nullptr) { + return true; + } bool fRet = pqueue->Wait(); fDone = true; return fRet; } void Add(std::vector &vChecks) { - if (pqueue != nullptr) pqueue->Add(vChecks); + if (pqueue != nullptr) { + pqueue->Add(vChecks); + } } ~CCheckQueueControl() { - if (!fDone) Wait(); + if (!fDone) { + Wait(); + } + if (pqueue != nullptr) { + LEAVE_CRITICAL_SECTION(pqueue->ControlMutex); + } } }; diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -62,6 +62,7 @@ cashaddrenc_tests.cpp checkdatasig_tests.cpp checkpoints_tests.cpp + checkqueue_tests.cpp coins_tests.cpp compress_tests.cpp config_tests.cpp diff --git a/src/test/checkqueue_tests.cpp b/src/test/checkqueue_tests.cpp new file mode 100644 --- /dev/null +++ b/src/test/checkqueue_tests.cpp @@ -0,0 +1,423 @@ +// Copyright (c) 2012-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +// BasicTestingSetup not sufficient because nScriptCheckThreads is not set +// otherwise. +BOOST_FIXTURE_TEST_SUITE(checkqueue_tests, TestingSetup) + +static const unsigned int QUEUE_BATCH_SIZE = 128; + +struct FakeCheck { + bool operator()() { return true; } + void swap(FakeCheck &x){}; +}; + +struct FakeCheckCheckCompletion { + static std::atomic n_calls; + bool operator()() { + n_calls.fetch_add(1, std::memory_order_relaxed); + return true; + } + void swap(FakeCheckCheckCompletion &x){}; +}; + +struct FailingCheck { + bool fails; + FailingCheck(bool _fails) : fails(_fails){}; + FailingCheck() : fails(true){}; + bool operator()() { return !fails; } + void swap(FailingCheck &x) { std::swap(fails, x.fails); }; +}; + +struct UniqueCheck { + static std::mutex m; + static std::unordered_multiset results; + size_t check_id; + UniqueCheck(size_t check_id_in) : check_id(check_id_in){}; + UniqueCheck() : check_id(0){}; + bool operator()() { + std::lock_guard l(m); + results.insert(check_id); + return true; + } + void swap(UniqueCheck &x) { std::swap(x.check_id, check_id); }; +}; + +struct MemoryCheck { + static std::atomic fake_allocated_memory; + bool b{false}; + bool operator()() { return true; } + MemoryCheck(){}; + MemoryCheck(const MemoryCheck &x) { + // We have to do this to make sure that destructor calls are paired + // + // Really, copy constructor should be deletable, but CCheckQueue breaks + // if it is deleted because of internal push_back. + fake_allocated_memory.fetch_add(b, std::memory_order_relaxed); + }; + MemoryCheck(bool b_) : b(b_) { + fake_allocated_memory.fetch_add(b, std::memory_order_relaxed); + }; + ~MemoryCheck() { + fake_allocated_memory.fetch_sub(b, std::memory_order_relaxed); + }; + void swap(MemoryCheck &x) { std::swap(b, x.b); }; +}; + +struct FrozenCleanupCheck { + static std::atomic nFrozen; + static std::condition_variable cv; + static std::mutex m; + // Freezing can't be the default initialized behavior given how the queue + // swaps in default initialized Checks. + bool should_freeze{false}; + bool operator()() { return true; } + FrozenCleanupCheck() {} + ~FrozenCleanupCheck() { + if (should_freeze) { + std::unique_lock l(m); + nFrozen.store(1, std::memory_order_relaxed); + cv.notify_one(); + cv.wait( + l, [] { return nFrozen.load(std::memory_order_relaxed) == 0; }); + } + } + void swap(FrozenCleanupCheck &x) { + std::swap(should_freeze, x.should_freeze); + }; +}; + +// Static Allocations +std::mutex FrozenCleanupCheck::m{}; +std::atomic FrozenCleanupCheck::nFrozen{0}; +std::condition_variable FrozenCleanupCheck::cv{}; +std::mutex UniqueCheck::m; +std::unordered_multiset UniqueCheck::results; +std::atomic FakeCheckCheckCompletion::n_calls{0}; +std::atomic MemoryCheck::fake_allocated_memory{0}; + +// Queue Typedefs +typedef CCheckQueue Correct_Queue; +typedef CCheckQueue Standard_Queue; +typedef CCheckQueue Failing_Queue; +typedef CCheckQueue Unique_Queue; +typedef CCheckQueue Memory_Queue; +typedef CCheckQueue FrozenCleanup_Queue; + +/** This test case checks that the CCheckQueue works properly + * with each specified size_t Checks pushed. + */ +static void Correct_Queue_range(std::vector range) { + auto small_queue = + std::unique_ptr(new Correct_Queue{QUEUE_BATCH_SIZE}); + boost::thread_group tg; + for (auto x = 0; x < nScriptCheckThreads; ++x) { + tg.create_thread([&] { small_queue->Thread(); }); + } + // Make vChecks here to save on malloc (this test can be slow...) + std::vector vChecks; + for (const size_t i : range) { + size_t total = i; + FakeCheckCheckCompletion::n_calls = 0; + CCheckQueueControl control(small_queue.get()); + while (total) { + vChecks.resize(std::min(total, (size_t)InsecureRandRange(10))); + total -= vChecks.size(); + control.Add(vChecks); + } + BOOST_REQUIRE(control.Wait()); + if (FakeCheckCheckCompletion::n_calls != i) { + BOOST_REQUIRE_EQUAL(FakeCheckCheckCompletion::n_calls, i); + BOOST_TEST_MESSAGE("Failure on trial " + << i << " expected, got " + << FakeCheckCheckCompletion::n_calls); + } + } + tg.interrupt_all(); + tg.join_all(); +} + +/** Test that 0 checks is correct + */ +BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Zero) { + std::vector range; + range.push_back((size_t)0); + Correct_Queue_range(range); +} +/** Test that 1 check is correct + */ +BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_One) { + std::vector range; + range.push_back((size_t)1); + Correct_Queue_range(range); +} +/** Test that MAX check is correct + */ +BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Max) { + std::vector range; + range.push_back(100000); + Correct_Queue_range(range); +} +/** Test that random numbers of checks are correct + */ +BOOST_AUTO_TEST_CASE(test_CheckQueue_Correct_Random) { + std::vector range; + range.reserve(100000 / 1000); + for (size_t i = 2; i < 100000; + i += std::max((size_t)1, (size_t)InsecureRandRange(std::min( + (size_t)1000, ((size_t)100000) - i)))) + range.push_back(i); + Correct_Queue_range(range); +} + +/** Test that failing checks are caught */ +BOOST_AUTO_TEST_CASE(test_CheckQueue_Catches_Failure) { + auto fail_queue = + std::unique_ptr(new Failing_Queue{QUEUE_BATCH_SIZE}); + + boost::thread_group tg; + for (auto x = 0; x < nScriptCheckThreads; ++x) { + tg.create_thread([&] { fail_queue->Thread(); }); + } + + for (size_t i = 0; i < 1001; ++i) { + CCheckQueueControl control(fail_queue.get()); + size_t remaining = i; + while (remaining) { + size_t r = InsecureRandRange(10); + + std::vector vChecks; + vChecks.reserve(r); + for (size_t k = 0; k < r && remaining; k++, remaining--) + vChecks.emplace_back(remaining == 1); + control.Add(vChecks); + } + bool success = control.Wait(); + if (i > 0) { + BOOST_REQUIRE(!success); + } else if (i == 0) { + BOOST_REQUIRE(success); + } + } + tg.interrupt_all(); + tg.join_all(); +} +// Test that a block validation which fails does not interfere with +// future blocks, ie, the bad state is cleared. +BOOST_AUTO_TEST_CASE(test_CheckQueue_Recovers_From_Failure) { + auto fail_queue = + std::unique_ptr(new Failing_Queue{QUEUE_BATCH_SIZE}); + boost::thread_group tg; + for (auto x = 0; x < nScriptCheckThreads; ++x) { + tg.create_thread([&] { fail_queue->Thread(); }); + } + + for (auto times = 0; times < 10; ++times) { + for (const bool end_fails : {true, false}) { + CCheckQueueControl control(fail_queue.get()); + { + std::vector vChecks; + vChecks.resize(100, false); + vChecks[99] = end_fails; + control.Add(vChecks); + } + bool r = control.Wait(); + BOOST_REQUIRE(r != end_fails); + } + } + tg.interrupt_all(); + tg.join_all(); +} + +// Test that unique checks are actually all called individually, rather than +// just one check being called repeatedly. Test that checks are not called +// more than once as well +BOOST_AUTO_TEST_CASE(test_CheckQueue_UniqueCheck) { + auto queue = + std::unique_ptr(new Unique_Queue{QUEUE_BATCH_SIZE}); + boost::thread_group tg; + for (auto x = 0; x < nScriptCheckThreads; ++x) { + tg.create_thread([&] { queue->Thread(); }); + } + + size_t COUNT = 100000; + size_t total = COUNT; + { + CCheckQueueControl control(queue.get()); + while (total) { + size_t r = InsecureRandRange(10); + std::vector vChecks; + for (size_t k = 0; k < r && total; k++) + vChecks.emplace_back(--total); + control.Add(vChecks); + } + } + bool r = true; + BOOST_REQUIRE_EQUAL(UniqueCheck::results.size(), COUNT); + for (size_t i = 0; i < COUNT; ++i) + r = r && UniqueCheck::results.count(i) == 1; + BOOST_REQUIRE(r); + tg.interrupt_all(); + tg.join_all(); +} + +// Test that blocks which might allocate lots of memory free their memory +// aggressively. +// +// This test attempts to catch a pathological case where by lazily freeing +// checks might mean leaving a check un-swapped out, and decreasing by 1 each +// time could leave the data hanging across a sequence of blocks. +BOOST_AUTO_TEST_CASE(test_CheckQueue_Memory) { + auto queue = + std::unique_ptr(new Memory_Queue{QUEUE_BATCH_SIZE}); + boost::thread_group tg; + for (auto x = 0; x < nScriptCheckThreads; ++x) { + tg.create_thread([&] { queue->Thread(); }); + } + for (size_t i = 0; i < 1000; ++i) { + size_t total = i; + { + CCheckQueueControl control(queue.get()); + while (total) { + size_t r = InsecureRandRange(10); + std::vector vChecks; + for (size_t k = 0; k < r && total; k++) { + total--; + // Each iteration leaves data at the front, back, and middle + // to catch any sort of deallocation failure + vChecks.emplace_back(total == 0 || total == i || + total == i / 2); + } + control.Add(vChecks); + } + } + BOOST_REQUIRE_EQUAL(MemoryCheck::fake_allocated_memory, 0U); + } + tg.interrupt_all(); + tg.join_all(); +} + +// Test that a new verification cannot occur until all checks +// have been destructed +BOOST_AUTO_TEST_CASE(test_CheckQueue_FrozenCleanup) { + auto queue = std::unique_ptr( + new FrozenCleanup_Queue{QUEUE_BATCH_SIZE}); + boost::thread_group tg; + bool fails = false; + for (auto x = 0; x < nScriptCheckThreads; ++x) { + tg.create_thread([&] { queue->Thread(); }); + } + std::thread t0([&]() { + CCheckQueueControl control(queue.get()); + std::vector vChecks(1); + // Freezing can't be the default initialized behavior given how the + // queue + // swaps in default initialized Checks (otherwise freezing destructor + // would get called twice). + vChecks[0].should_freeze = true; + control.Add(vChecks); + control.Wait(); // Hangs here + }); + { + std::unique_lock l(FrozenCleanupCheck::m); + // Wait until the queue has finished all jobs and frozen + FrozenCleanupCheck::cv.wait( + l, []() { return FrozenCleanupCheck::nFrozen == 1; }); + } + // Try to get control of the queue a bunch of times + for (auto x = 0; x < 100 && !fails; ++x) { + fails = queue->ControlMutex.try_lock(); + } + { + // Unfreeze (we need lock n case of spurious wakeup) + std::unique_lock l(FrozenCleanupCheck::m); + FrozenCleanupCheck::nFrozen = 0; + } + // Awaken frozen destructor + FrozenCleanupCheck::cv.notify_one(); + // Wait for control to finish + t0.join(); + tg.interrupt_all(); + tg.join_all(); + BOOST_REQUIRE(!fails); +} + +/** Test that CCheckQueueControl is threadsafe */ +BOOST_AUTO_TEST_CASE(test_CheckQueueControl_Locks) { + auto queue = + std::unique_ptr(new Standard_Queue{QUEUE_BATCH_SIZE}); + { + boost::thread_group tg; + std::atomic nThreads{0}; + std::atomic fails{0}; + for (size_t i = 0; i < 3; ++i) { + tg.create_thread([&] { + CCheckQueueControl control(queue.get()); + // While sleeping, no other thread should execute to this point + auto observed = ++nThreads; + MilliSleep(10); + fails += observed != nThreads; + }); + } + tg.join_all(); + BOOST_REQUIRE_EQUAL(fails, 0); + } + { + boost::thread_group tg; + std::mutex m; + std::condition_variable cv; + bool has_lock{false}; + bool has_tried{false}; + bool done{false}; + bool done_ack{false}; + { + std::unique_lock l(m); + tg.create_thread([&] { + CCheckQueueControl control(queue.get()); + std::unique_lock ll(m); + has_lock = true; + cv.notify_one(); + cv.wait(ll, [&] { return has_tried; }); + done = true; + cv.notify_one(); + // Wait until the done is acknowledged + // + cv.wait(ll, [&] { return done_ack; }); + }); + // Wait for thread to get the lock + cv.wait(l, [&]() { return has_lock; }); + bool fails = false; + for (auto x = 0; x < 100 && !fails; ++x) { + fails = queue->ControlMutex.try_lock(); + } + has_tried = true; + cv.notify_one(); + cv.wait(l, [&]() { return done; }); + // Acknowledge the done + done_ack = true; + cv.notify_one(); + BOOST_REQUIRE(!fails); + } + tg.join_all(); + } +} +BOOST_AUTO_TEST_SUITE_END()