diff --git a/src/rcu.cpp b/src/rcu.cpp index dd499c06d..c4aaffe61 100644 --- a/src/rcu.cpp +++ b/src/rcu.cpp @@ -1,199 +1,232 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "rcu.h" #include "sync.h" +#include #include #include std::atomic RCUInfos::revision{0}; thread_local RCUInfos RCUInfos::infos{}; /** * How many time a busy loop runs before yelding. */ -static const int RCU_ACTIVE_LOOP_COUNT = 30; +static const int RCU_ACTIVE_LOOP_COUNT = 10; /** * We maintain a linked list of all the RCUInfos for each active thread. Upon * start, a new thread adds itself to the head of the liked list and the node is * then removed when the threads shuts down. * * Insertion is fairly straightforward. The first step is to set the next * pointer of the node being inserted as the first node in the list as follow: * * threadInfos -> Node -> ... * ^ * Nadded -| * * The second step is to update threadInfos to point on the inserted node. This * is done using compare and swap. If the head of the list changed during this * process - for instance due to another insertion, CAS will fail and we can * start again. * * threadInfos Node -> ... * | ^ * \-> Nadded -| * * Deletion is a slightly more complex process. The general idea is to go over * the list, find the parent of the item we want to remove and set it's next * pointer to jump over it. * * Nparent -> Ndelete -> Nchild * * Nparent Ndelete -> Nchild * | ^ * \--------------------| * * We run into problems when a nodes is deleted concurrently with another node * being inserted. Hopefully, we can solve that problem with CAS as well. * * threadInfos -> Ndelete -> Nchild * ^ * Nadded -| * * The insertion will try to update threadInfos to point to Nadded, while the * deletion will try to update it to point to Nchild. Whichever goes first will * cause the other to fail its CAS and restart its process. * * threadInfos Ndelete -> Nchild * | ^ * \--> Nadded -| * * After a successful insertion, threadInfos now points to Nadded, and the CAS * to move it to Nchild will fail, causing the deletion process to restart from * scratch. * * /----------------------| * | V * threadInfos Ndelete -> Nchild * ^ * Nadded -| * * After a succesful deletion, threadInfos now points to NChild and the CAS to * move it to Nadded will fail, causing the insertion process to fail. * * We also run into problems when several nodes are deleted concurrently. * Because it is not possible to read Ndelete->next and update Nparent->next * atomically, we may end up setting Nparent->next to a stale value if Nchild is * deleted. * * /----------------------| * | V * Nparent Ndelete Nchild -> Ngrandchild * | ^ * \--------------------| * * This would cause Nchild to be 'resurrected', which is obviously a problem. In * order to avoid this problem, we make sure that no concurrent deletion takes * places using a good old mutex. Using a mutex for deletion also ensures we are * safe from the ABA problem. * * Once a node is deleted from the list, we cannot destroy it right away. * Readers do not hold the mutex and may still be using that node. We need to * leverage RCU to make sure all the readers have finished their work before * allowing the node to be destroyed. We need to keep the mutex during that * process, because we just removed our thread from the list of thread to wait * for. A concurrent deletion would not wait for us and may end up deleting data * we rely on as a result. */ static std::atomic threadInfos{nullptr}; static CCriticalSection csThreadInfosDelete; -RCUInfos::RCUInfos() : next(nullptr), state(0) { +RCUInfos::RCUInfos() : state(0), next(nullptr) { RCUInfos *head = threadInfos.load(); do { next.store(head); } while (!threadInfos.compare_exchange_weak(head, this)); // Release the lock. readFree(); } RCUInfos::~RCUInfos() { + /** + * Before the thread is removed from the list, make sure we cleanup + * everything. + */ + runCleanups(); + while (cleanups.size() > 0) { + synchronize(); + } + while (true) { LOCK(csThreadInfosDelete); std::atomic *ptr; { RCULock lock(this); ptr = &threadInfos; while (true) { RCUInfos *current = ptr->load(); if (current == this) { break; } assert(current != nullptr); ptr = ¤t->next; } } /** * We have our node and the parent is ready to be updated. * NB: The CAS operation only checks for *ptr and not for next. This * would be a big problem in the general case, but because we only * insert at the tip of the list and cannot have concurrent deletion * thanks to the use of a mutex, we are safe. */ RCUInfos *current = this; if (!ptr->compare_exchange_strong(current, next.load())) { continue; } /** * We now wait for possible readers to go past the synchronization * point. We need to do so while holding the lock as this operation * require us to a be a reader, but we just removed ourselves from the * list of reader to check and may therefore not be waited for. */ synchronize(); break; } } void RCUInfos::synchronize() { uint64_t syncRev = ++revision; // Loop a few time lock free. for (int i = 0; i < RCU_ACTIVE_LOOP_COUNT; i++) { - if (hasSynced(syncRev)) { - // Done! + runCleanups(); + if (cleanups.empty() && hasSyncedTo(syncRev)) { return; } } // It seems like we have some contention. Let's try to not starve the // system. Let's make sure threads that land here proceed one by one. // XXX: The best option long term is most likely to use a futex on one of // the thread causing synchronization delay so this thread can be waked up // at an apropriate time. static std::condition_variable cond; static CWaitableCriticalSection cs; WAIT_LOCK(cs, lock); do { + runCleanups(); cond.notify_one(); - } while (!cond.wait_for(lock, std::chrono::microseconds(1), - [&] { return hasSynced(syncRev); })); + } while (!cond.wait_for(lock, std::chrono::microseconds(1), [&] { + return cleanups.empty() && hasSyncedTo(syncRev); + })); +} + +void RCUInfos::runCleanups() { + // By the time we run a set of cleanups, we may have more cleanups + // available so we loop until there is nothing available for cleanup. + while (true) { + if (cleanups.empty()) { + // There is nothing to cleanup. + return; + } + + auto it = cleanups.begin(); + uint64_t syncedTo = hasSyncedTo(it->first); + while (it != cleanups.end() && it->first <= syncedTo) { + // Run the cleanup and remove it from the map. + it->second(); + cleanups.erase(it++); + } + } } -bool RCUInfos::hasSynced(uint64_t syncRev) { +uint64_t RCUInfos::hasSyncedTo(uint64_t cutoff) { + uint64_t syncedTo = revision.load(); + // Go over the list and check all threads are past the synchronization // point. RCULock lock(this); RCUInfos *current = threadInfos.load(); while (current != nullptr) { - // If the current thread is not up to speed, bail. - if (current->state < syncRev) { - return false; + syncedTo = std::min(syncedTo, current->state.load()); + if (syncedTo < cutoff) { + return 0; } current = current->next.load(); } - return true; + return syncedTo; } diff --git a/src/rcu.h b/src/rcu.h index 4eaf7ffdf..7c736e97b 100644 --- a/src/rcu.h +++ b/src/rcu.h @@ -1,63 +1,75 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_RCU_H #define BITCOIN_RCU_H #include #include #include #include +#include +#include class RCUInfos; class RCUReadLock; class RCUInfos { - std::atomic next; std::atomic state; + std::atomic next; + + std::map> cleanups; // The largest revision possible means unlocked. static const uint64_t UNLOCKED = -uint64_t(1); RCUInfos(); ~RCUInfos(); void readLock() { assert(!isLocked()); state.store(revision.load()); } void readFree() { assert(isLocked()); state.store(UNLOCKED); } bool isLocked() const { return state.load() != UNLOCKED; } + void registerCleanup(const std::function &f) { + cleanups.emplace(++revision, f); + } void synchronize(); - bool hasSynced(uint64_t syncRev); + void runCleanups(); + uint64_t hasSyncedTo(uint64_t cutoff = UNLOCKED); friend class RCULock; friend struct RCUTest; static std::atomic revision; static thread_local RCUInfos infos; }; class RCULock : public boost::noncopyable { RCUInfos *infos; RCULock(RCUInfos *infosIn) : infos(infosIn) { infos->readLock(); } friend class RCUInfos; public: RCULock() : RCULock(&RCUInfos::infos) {} ~RCULock() { infos->readFree(); } - static void synchronize() { RCUInfos::infos.synchronize(); } static bool isLocked() { return RCUInfos::infos.isLocked(); } + static void registerCleanup(const std::function &f) { + RCUInfos::infos.registerCleanup(f); + } + + static void synchronize() { RCUInfos::infos.synchronize(); } }; #endif // BITCOIN_RCU_H diff --git a/src/test/rcu_tests.cpp b/src/test/rcu_tests.cpp index d9f8f5e20..d1d405a99 100644 --- a/src/test/rcu_tests.cpp +++ b/src/test/rcu_tests.cpp @@ -1,133 +1,189 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "rcu.h" #include "test/test_bitcoin.h" #include #include struct RCUTest { static uint64_t getRevision() { return RCUInfos::revision.load(); } - static uint64_t hasSynced(uint64_t syncRev) { - return RCUInfos::infos.hasSynced(syncRev); + static uint64_t hasSyncedTo(uint64_t syncRev) { + return RCUInfos::infos.hasSyncedTo(syncRev); + } + + static std::map> &getCleanups() { + return RCUInfos::infos.cleanups; } }; BOOST_FIXTURE_TEST_SUITE(rcu_tests, BasicTestingSetup) enum RCUTestStep { Init, Locked, LockAck, RCULocked, Synchronizing, Synchronized, }; #define WAIT_FOR_STEP(step) \ do { \ cond.notify_all(); \ } while (!cond.wait_for(lock, std::chrono::milliseconds(1), \ [&] { return otherstep == step; })) void synchronize(std::atomic &step, const std::atomic &otherstep, CWaitableCriticalSection &cs, std::condition_variable &cond, std::atomic &syncRev) { assert(step == RCUTestStep::Init); { WAIT_LOCK(cs, lock); step = RCUTestStep::Locked; // Wait for our lock to be acknowledged. WAIT_FOR_STEP(RCUTestStep::LockAck); RCULock rculock; // Update step. step = RCUTestStep::RCULocked; // Wait for master. WAIT_FOR_STEP(RCUTestStep::RCULocked); } // Update step. syncRev = RCUTest::getRevision() + 1; step = RCUTestStep::Synchronizing; - assert(!RCUTest::hasSynced(syncRev)); + assert(!RCUTest::hasSyncedTo(syncRev)); // We wait for readers. RCULock::synchronize(); // Update step. step = RCUTestStep::Synchronized; } void lockAndWaitForSynchronize(std::atomic &step, const std::atomic &otherstep, CWaitableCriticalSection &cs, std::condition_variable &cond, std::atomic &syncRev) { assert(step == RCUTestStep::Init); WAIT_LOCK(cs, lock); // Wait for th eother thread to be locked. WAIT_FOR_STEP(RCUTestStep::Locked); step = RCUTestStep::LockAck; // Wait for the synchronizing tread to take its RCU lock. WAIT_FOR_STEP(RCUTestStep::RCULocked); - assert(!RCUTest::hasSynced(syncRev)); + assert(!RCUTest::hasSyncedTo(syncRev)); { RCULock rculock; // Update master step. step = RCUTestStep::RCULocked; while (RCUTest::getRevision() < syncRev) { WAIT_FOR_STEP(RCUTestStep::Synchronizing); } assert(RCUTest::getRevision() >= syncRev); assert(otherstep.load() == RCUTestStep::Synchronizing); } - assert(RCUTest::hasSynced(syncRev)); + assert(RCUTest::hasSyncedTo(syncRev) >= syncRev); WAIT_FOR_STEP(RCUTestStep::Synchronized); } static const int COUNT = 128; BOOST_AUTO_TEST_CASE(synchronize_test) { CWaitableCriticalSection cs; std::condition_variable cond; std::atomic parentstep; std::atomic childstep; std::atomic syncRev; for (int i = 0; i < COUNT; i++) { parentstep = RCUTestStep::Init; childstep = RCUTestStep::Init; syncRev = RCUTest::getRevision() + 1; std::thread tlock([&] { lockAndWaitForSynchronize(parentstep, childstep, cs, cond, syncRev); }); std::thread tsync( [&] { synchronize(childstep, parentstep, cs, cond, syncRev); }); tlock.join(); tsync.join(); } } +BOOST_AUTO_TEST_CASE(cleanup_test) { + BOOST_CHECK(RCUTest::getCleanups().empty()); + + bool isClean1 = false; + RCULock::registerCleanup([&] { isClean1 = true; }); + + BOOST_CHECK(!isClean1); + BOOST_CHECK_EQUAL(RCUTest::getCleanups().size(), 1); + BOOST_CHECK_EQUAL(RCUTest::getRevision(), + RCUTest::getCleanups().begin()->first); + + // Synchronize runs the cleanups. + RCULock::synchronize(); + BOOST_CHECK(RCUTest::getCleanups().empty()); + BOOST_CHECK(isClean1); + + // Check multiple callbacks. + isClean1 = false; + bool isClean2 = false; + bool isClean3 = false; + RCULock::registerCleanup([&] { isClean1 = true; }); + RCULock::registerCleanup([&] { isClean2 = true; }); + RCULock::registerCleanup([&] { isClean3 = true; }); + + BOOST_CHECK_EQUAL(RCUTest::getCleanups().size(), 3); + RCULock::synchronize(); + BOOST_CHECK(RCUTest::getCleanups().empty()); + BOOST_CHECK(isClean1); + BOOST_CHECK(isClean2); + BOOST_CHECK(isClean3); + + // Check callbacks adding each others. + isClean1 = false; + isClean2 = false; + isClean3 = false; + + RCULock::registerCleanup([&] { + isClean1 = true; + RCULock::registerCleanup([&] { + isClean2 = true; + RCULock::registerCleanup([&] { isClean3 = true; }); + }); + }); + + BOOST_CHECK_EQUAL(RCUTest::getCleanups().size(), 1); + RCULock::synchronize(); + BOOST_CHECK(RCUTest::getCleanups().empty()); + BOOST_CHECK(isClean1); + BOOST_CHECK(isClean2); + BOOST_CHECK(isClean3); +} + BOOST_AUTO_TEST_SUITE_END()