diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -76,6 +76,7 @@ fs.cpp logging.cpp random.cpp + rcu.cpp rpc/protocol.cpp support/cleanse.cpp support/lockedpool.cpp diff --git a/src/Makefile.am b/src/Makefile.am --- a/src/Makefile.am +++ b/src/Makefile.am @@ -159,6 +159,7 @@ pow.h \ protocol.h \ random.h \ + rcu.h \ reverse_iterator.h \ reverselock.h \ rpc/blockchain.h \ @@ -427,6 +428,7 @@ fs.cpp \ logging.cpp \ random.cpp \ + rcu.cpp \ rpc/protocol.cpp \ support/cleanse.cpp \ sync.cpp \ diff --git a/src/Makefile.test.include b/src/Makefile.test.include --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -79,6 +79,7 @@ test/prevector_tests.cpp \ test/raii_event_tests.cpp \ test/random_tests.cpp \ + test/rcu_tests.cpp \ test/reverselock_tests.cpp \ test/rpc_tests.cpp \ test/rwcollection_tests.cpp \ diff --git a/src/rcu.h b/src/rcu.h new file mode 100644 --- /dev/null +++ b/src/rcu.h @@ -0,0 +1,64 @@ +// Copyright (c) 2018 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_RCU_H +#define BITCOIN_RCU_H + +#include + +#include +#include +#include +#include + +class RCUInfos; +class RCUReadLock; + +class RCUInfos { + std::atomic next; + std::atomic state; + + // The largest revision possible means unlocked. + static const uint64_t UNLOCKED = -uint64_t(1); + + RCUInfos(); + ~RCUInfos(); + + void readLock() { + assert(!isLocked()); + state.store(revision.load()); + } + + void readFree() { + assert(isLocked()); + state.store(UNLOCKED); + } + + bool isLocked() const { return state.load() != UNLOCKED; } + + void synchronize(); + bool hasSynced(uint64_t syncRev); + + friend class RCULock; + friend struct RCUTest; + + static std::atomic revision; + static thread_local RCUInfos infos; +}; + +class RCULock : public boost::noncopyable { + RCUInfos *infos; + + RCULock(RCUInfos *infosIn) : infos(infosIn) { infos->readLock(); } + friend class RCUInfos; + +public: + RCULock() : RCULock(&RCUInfos::infos) {} + ~RCULock() { infos->readFree(); } + + static void synchronize() { RCUInfos::infos.synchronize(); } + static bool isLocked() { return RCUInfos::infos.isLocked(); } +}; + +#endif // BITCOIN_RCU_H diff --git a/src/rcu.cpp b/src/rcu.cpp new file mode 100644 --- /dev/null +++ b/src/rcu.cpp @@ -0,0 +1,187 @@ +// Copyright (c) 2018 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "rcu.h" +#include "sync.h" + +#include + +std::atomic RCUInfos::revision(0); +thread_local RCUInfos RCUInfos::infos{}; + +/** + * How many tim a buzy loop runs before yeilding. + */ +static const uint64_t RCU_ACTIVE_LOOP_COUNT = 30; + +/** + * We maintain a linked list of all the RCUInfos for each active thread. Upon + * start, a new thread adds itself to the head of the liked list and the node is + * then removed when the threads shuts down. + * + * Insertion is fairly straightforward. The first step is to set the next + * pointer of the node being inserted as the first node in the list as follow: + * + * threadInfos -> Node -> ... + * ^ + * Nadded -| + * + * The second step is to update threadInfos to point on the inserted node. This + * is done using compare and swap. If the head of the list changed during this + * process - for instance due to another insertion, CAS will fail and we can + * start again. + * + * threadInfos Node -> ... + * | ^ + * \-> Nadded -| + * + * Deletion is a slightly more complex process. The general idea is to go over + * the list, find the parent of the item we want to remove and set it's next + * pointer to jump over it. + * + * Nparent -> Ndelete -> Nchild + * + * Nparent Ndelete -> Nchild + * | ^ + * \--------------------| + * + * We run into problems when a nodes is deleted concurently wth another node + * being inserted. Hopefully, we can solve that problem with CAS as well. + * + * threadInfos -> Ndelete -> Nchild + * ^ + * Nadded -| + * + * The insertion will try to update threadInfos to point to Nadded, while the + * deletion will try to update it to point to Nchild. Whichever goes first will + * cause the other to fail its CAS and restart its process. + * + * threadInfos Ndelete -> Nchild + * | ^ + * \--> Nadded -| + * + * After a successful insertion, threadInfos now points to Nadded, and the CAS + * to move it to Nchild will fail, causing the deletion process to restart from + * scratch. + * + * /----------------------| + * | V + * threadInfos Ndelete -> Nchild + * ^ + * Nadded -| + * + * After a succesful deletion, threadInfos now points to NChild and the CAS to + * move it to Nadded will fail, causing the insertion process to fail. + * + * We also run into problems when several nodes are deleted concurently. Because + * it is not possible to read Ndelete->next and update Nparent->next atomicaly, + * we may end up setting Nparent->next to a stale value if Nchild is deleted. + * + * /----------------------| + * | V + * Nparent Ndelete Nchild -> Ngrandchild + * | ^ + * \--------------------| + * + * This would cause Nchild to be 'resurected', which is obviously a problem. In + * order to avoid this problem, we sure that no concurent deletion takes places + * using a good old mutex. Using a mutex for deletion also ensures we are safe + * from the ABA problem. + * + * Once a node is deleted from the list, we cannot destroy it right away. + * Readers do not hold the mutex and may still be using that node. We need to + * leverage RCU to make sure all the readers have finished their work before + * allowing the node to be destroyed. We need to keep the mutex during that + * process, because we just removed our thread from the list of thread to wait + * for. A concurent deletion would not wait for us and may end up deleting data + * we rely on as a result. + */ +std::atomic threadInfos{nullptr}; +CCriticalSection csThreadInfosDelete; + +RCUInfos::RCUInfos() : next(nullptr), state(0) { + while (true) { + RCUInfos *head = threadInfos.load(); + next.store(head); + if (threadInfos.compare_exchange_weak(head, this)) { + break; + } + } + + // Release the lock. + readFree(); +} + +RCUInfos::~RCUInfos() { + while (true) { + LOCK(csThreadInfosDelete); + + std::atomic *ptr; + + { + RCULock lock(this); + ptr = &threadInfos; + while (true) { + RCUInfos *current = ptr->load(); + if (current == this) { + break; + } + + assert(current != nullptr); + ptr = ¤t->next; + } + } + + /** + * We have our node and the parent is ready to be updated. + * NB: The CAS operation ony checks for *ptr and not for next. This + * would be a big problem in the general case, but because we only + * insert at the tip of the list and cannot have concurent deletion + * thanks to the use of a mutex, we are safe. + */ + RCUInfos *foo = this; + if (!ptr->compare_exchange_weak(foo, next.load())) { + continue; + } + + /** + * We now wait for possible readers to to go past the synchronization + * point. We need to do so while holding the lock as this operation + * require us to a be a reader, but we just removed ourselves from the + * list of reader to check and may therefore not be waited for. + */ + synchronize(); + break; + } +} + +void RCUInfos::synchronize() { + uint64_t count = 0, syncRev = ++revision; + + // Loop until all thread synced. + while (!hasSynced(syncRev)) { + if (count++ >= RCU_ACTIVE_LOOP_COUNT) { + // Make sure we don't starve the system by spinning too long without + // yielding. + std::this_thread::yield(); + } + } +} + +bool RCUInfos::hasSynced(uint64_t syncRev) { + // Go over the list and check all threads are past the synchronization + // point. + RCULock lock(this); + RCUInfos *current = threadInfos.load(); + while (current != nullptr) { + // If the current thread is not up to speed, bail. + if (current->state < syncRev) { + return false; + } + + current = current->next.load(); + } + + return true; +} diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -96,6 +96,7 @@ prevector_tests.cpp raii_event_tests.cpp random_tests.cpp + rcu_tests.cpp reverselock_tests.cpp rpc_tests.cpp rwcollection_tests.cpp diff --git a/src/test/rcu_tests.cpp b/src/test/rcu_tests.cpp new file mode 100644 --- /dev/null +++ b/src/test/rcu_tests.cpp @@ -0,0 +1,133 @@ +// Copyright (c) 2010 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "rcu.h" + +#include "test/test_bitcoin.h" + +#include + +#include + +struct RCUTest { + static uint64_t getRevision() { return RCUInfos::revision.load(); } + + static uint64_t hasSynced(uint64_t syncRev) { + return RCUInfos::infos.hasSynced(syncRev); + } +}; + +BOOST_FIXTURE_TEST_SUITE(rcu_tests, BasicTestingSetup) + +enum RCUTestStep { + Init, + Locked, + LockAck, + RCULocked, + Synchronizing, + Synchronized, +}; + +#define WAIT_FOR_STEP(step) \ + do { \ + cond.notify_all(); \ + } while (!cond.wait_for(lock, std::chrono::milliseconds(1), \ + [&] { return otherstep == step; })) + +void synchronize(std::atomic &step, + const std::atomic &otherstep, + CWaitableCriticalSection &cs, std::condition_variable &cond, + std::atomic &syncRev) { + BOOST_CHECK(step == RCUTestStep::Init); + + { + WAIT_LOCK(cs, lock); + step = RCUTestStep::Locked; + + // Wait for our lock to be acknowledged. + WAIT_FOR_STEP(RCUTestStep::LockAck); + + RCULock rculock; + + // Update step. + step = RCUTestStep::RCULocked; + + // Wait for master. + WAIT_FOR_STEP(RCUTestStep::RCULocked); + } + + // Update step. + syncRev = RCUTest::getRevision() + 1; + step = RCUTestStep::Synchronizing; + + BOOST_CHECK(!RCUTest::hasSynced(syncRev)); + + // We wait for readers. + RCULock::synchronize(); + + // Update step. + step = RCUTestStep::Synchronized; +} + +void lockAndWaitForSynchronize(std::atomic &step, + const std::atomic &otherstep, + CWaitableCriticalSection &cs, + std::condition_variable &cond, + std::atomic &syncRev) { + BOOST_CHECK(step == RCUTestStep::Init); + WAIT_LOCK(cs, lock); + + // Wait for th eother thread to be locked. + WAIT_FOR_STEP(RCUTestStep::Locked); + step = RCUTestStep::LockAck; + + // Wait for the synchronizing tread to take its RCU lock. + WAIT_FOR_STEP(RCUTestStep::RCULocked); + BOOST_CHECK(!RCUTest::hasSynced(syncRev)); + + { + RCULock rculock; + + // Update master step. + step = RCUTestStep::RCULocked; + + while (RCUTest::getRevision() < syncRev) { + WAIT_FOR_STEP(RCUTestStep::Synchronizing); + } + + BOOST_CHECK(RCUTest::getRevision() >= syncRev); + BOOST_CHECK_EQUAL(otherstep.load(), RCUTestStep::Synchronizing); + } + + BOOST_CHECK(RCUTest::hasSynced(syncRev)); + WAIT_FOR_STEP(RCUTestStep::Synchronized); +} + +static const int COUNT = 128; + +BOOST_AUTO_TEST_CASE(synchronize_test) { + CWaitableCriticalSection cs; + std::condition_variable cond; + std::atomic parentstep; + std::atomic childstep; + std::atomic syncRev; + + for (int i = 0; i < COUNT; i++) { + parentstep = RCUTestStep::Init; + childstep = RCUTestStep::Init; + syncRev = RCUTest::getRevision() + 1; + + std::thread tlock([&] { + lockAndWaitForSynchronize(parentstep, childstep, cs, cond, syncRev); + }); + + std::thread tsync( + [&] { synchronize(childstep, parentstep, cs, cond, syncRev); }); + + tlock.join(); + tsync.join(); + } +} + +BOOST_AUTO_TEST_SUITE_END()