diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -331,6 +331,7 @@ globals.cpp httprpc.cpp httpserver.cpp + index/txindex.cpp init.cpp interfaces/handler.cpp interfaces/node.cpp diff --git a/src/Makefile.am b/src/Makefile.am --- a/src/Makefile.am +++ b/src/Makefile.am @@ -140,6 +140,7 @@ globals.h \ httprpc.h \ httpserver.h \ + index/txindex.h \ indirectmap.h \ init.h \ interfaces/handler.h \ @@ -250,6 +251,7 @@ globals.cpp \ httprpc.cpp \ httpserver.cpp \ + index/txindex.cpp \ init.cpp \ interfaces/handler.cpp \ interfaces/node.cpp \ diff --git a/src/index/txindex.h b/src/index/txindex.h new file mode 100644 --- /dev/null +++ b/src/index/txindex.h @@ -0,0 +1,87 @@ +// Copyright (c) 2017-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_INDEX_TXINDEX_H +#define BITCOIN_INDEX_TXINDEX_H + +#include +#include +#include +#include +#include + +class CBlockIndex; + +/** + * TxIndex is used to look up transactions included in the blockchain by hash. + * The index is written to a LevelDB database and records the filesystem + * location of each transaction by transaction hash. + */ +class TxIndex final : public CValidationInterface { +private: + const std::unique_ptr m_db; + + /// Whether the index is in sync with the main chain. The flag is flipped + /// from false to true once, after which point this starts processing + /// ValidationInterface notifications to stay in sync. + std::atomic m_synced; + + /// The last block in the chain that the TxIndex is in sync with. + std::atomic m_best_block_index; + + std::thread m_thread_sync; + CThreadInterrupt m_interrupt; + + /// Initialize internal state from the database and block index. + bool Init(); + + /// Sync the tx index with the block index starting from the current best + /// block. Intended to be run in its own thread, m_thread_sync, and can be + /// interrupted with m_interrupt. Once the txindex gets in sync, the + /// m_synced flag is set and the BlockConnected ValidationInterface callback + /// takes over and the sync thread exits. + void ThreadSync(); + + /// Write update index entries for a newly connected block. + bool WriteBlock(const CBlock &block, const CBlockIndex *pindex); + + /// Write the current chain block locator to the DB. + bool WriteBestBlock(const CBlockIndex *block_index); + +protected: + void + BlockConnected(const std::shared_ptr &block, + const CBlockIndex *pindex, + const std::vector &txn_conflicted) override; + + void SetBestChain(const CBlockLocator &locator) override; + +public: + /// Constructs the TxIndex, which becomes available to be queried. + explicit TxIndex(std::unique_ptr db); + + /// Destructor interrupts sync thread if running and blocks until it exits. + ~TxIndex(); + + /// Blocks the current thread until the transaction index is caught up to + /// the current state of the block chain. This only blocks if the index has + /// gotten in sync once and only needs to process blocks in the + /// ValidationInterface queue. If the index is catching up from far behind, + /// this method does not block and immediately returns false. + bool BlockUntilSyncedToCurrentChain(); + + /// Look up the on-disk location of a transaction by hash. + bool FindTx(const uint256 &txid, CDiskTxPos &pos) const; + + void Interrupt(); + + /// Start initializes the sync state and registers the instance as a + /// ValidationInterface so that it stays in sync with blockchain updates. + void Start(); + + /// Stops the instance from staying in sync with blockchain updates. + void Stop(); +}; + +#endif // BITCOIN_INDEX_TXINDEX_H diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp new file mode 100644 --- /dev/null +++ b/src/index/txindex.cpp @@ -0,0 +1,281 @@ +// Copyright (c) 2017-2018 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include +#include +#include +#include +#include +#include +#include +#include + +constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds +constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds + +template +static void FatalError(const char *fmt, const Args &... args) { + std::string strMessage = tfm::format(fmt, args...); + SetMiscWarning(strMessage); + LogPrintf("*** %s\n", strMessage); + uiInterface.ThreadSafeMessageBox( + "Error: A fatal internal error occurred, see debug.log for details", "", + CClientUIInterface::MSG_ERROR); + StartShutdown(); +} + +TxIndex::TxIndex(std::unique_ptr db) + : m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr) {} + +TxIndex::~TxIndex() { + Interrupt(); + Stop(); +} + +bool TxIndex::Init() { + LOCK(cs_main); + + // Attempt to migrate txindex from the old database to the new one. Even if + // chain_tip is null, the node could be reindexing and we still want to + // delete txindex records in the old database. + if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) { + return false; + } + + CBlockLocator locator; + if (!m_db->ReadBestBlock(locator)) { + locator.SetNull(); + } + + m_best_block_index = FindForkInGlobalIndex(chainActive, locator); + m_synced = m_best_block_index.load() == chainActive.Tip(); + return true; +} + +static const CBlockIndex *NextSyncBlock(const CBlockIndex *pindex_prev) { + AssertLockHeld(cs_main); + + if (!pindex_prev) { + return chainActive.Genesis(); + } + + const CBlockIndex *pindex = chainActive.Next(pindex_prev); + if (pindex) { + return pindex; + } + + return chainActive.Next(chainActive.FindFork(pindex_prev)); +} + +void TxIndex::ThreadSync() { + const CBlockIndex *pindex = m_best_block_index.load(); + if (!m_synced) { + auto &config = GetConfig(); + + int64_t last_log_time = 0; + int64_t last_locator_write_time = 0; + while (true) { + if (m_interrupt) { + WriteBestBlock(pindex); + return; + } + + { + LOCK(cs_main); + const CBlockIndex *pindex_next = NextSyncBlock(pindex); + if (!pindex_next) { + WriteBestBlock(pindex); + m_best_block_index = pindex; + m_synced = true; + break; + } + pindex = pindex_next; + } + + int64_t current_time = GetTime(); + if (last_log_time + SYNC_LOG_INTERVAL < current_time) { + LogPrintf("Syncing txindex with block chain from height %d\n", + pindex->nHeight); + last_log_time = current_time; + } + + if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < + current_time) { + WriteBestBlock(pindex); + last_locator_write_time = current_time; + } + + CBlock block; + if (!ReadBlockFromDisk(block, pindex, config)) { + FatalError("%s: Failed to read block %s from disk", __func__, + pindex->GetBlockHash().ToString()); + return; + } + if (!WriteBlock(block, pindex)) { + FatalError("%s: Failed to write block %s to tx index database", + __func__, pindex->GetBlockHash().ToString()); + return; + } + } + } + + if (pindex) { + LogPrintf("txindex is enabled at height %d\n", pindex->nHeight); + } else { + LogPrintf("txindex is enabled\n"); + } +} + +bool TxIndex::WriteBlock(const CBlock &block, const CBlockIndex *pindex) { + CDiskTxPos pos(pindex->GetBlockPos(), + GetSizeOfCompactSize(block.vtx.size())); + std::vector> vPos; + vPos.reserve(block.vtx.size()); + for (const auto &tx : block.vtx) { + vPos.emplace_back(tx->GetHash(), pos); + pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION); + } + return m_db->WriteTxs(vPos); +} + +bool TxIndex::WriteBestBlock(const CBlockIndex *block_index) { + LOCK(cs_main); + if (!m_db->WriteBestBlock(chainActive.GetLocator(block_index))) { + return error("%s: Failed to write locator to disk", __func__); + } + return true; +} + +void TxIndex::BlockConnected( + const std::shared_ptr &block, const CBlockIndex *pindex, + const std::vector &txn_conflicted) { + if (!m_synced) { + return; + } + + const CBlockIndex *best_block_index = m_best_block_index.load(); + if (!best_block_index) { + if (pindex->nHeight != 0) { + FatalError("%s: First block connected is not the genesis block " + "(height=%d)", + __func__, pindex->nHeight); + return; + } + } else { + // Ensure block connects to an ancestor of the current best block. This + // should be the case most of the time, but may not be immediately after + // the the sync thread catches up and sets m_synced. Consider the case + // where there is a reorg and the blocks on the stale branch are in the + // ValidationInterface queue backlog even after the sync thread has + // caught up to the new chain tip. In this unlikely event, log a warning + // and let the queue clear. + if (best_block_index->GetAncestor(pindex->nHeight - 1) != + pindex->pprev) { + LogPrintf("%s: WARNING: Block %s does not connect to an ancestor " + "of known best chain (tip=%s); not updating txindex\n", + __func__, pindex->GetBlockHash().ToString(), + best_block_index->GetBlockHash().ToString()); + return; + } + } + + if (WriteBlock(*block, pindex)) { + m_best_block_index = pindex; + } else { + FatalError("%s: Failed to write block %s to txindex", __func__, + pindex->GetBlockHash().ToString()); + return; + } +} + +void TxIndex::SetBestChain(const CBlockLocator &locator) { + if (!m_synced) { + return; + } + + const uint256 &locator_tip_hash = locator.vHave.front(); + const CBlockIndex *locator_tip_index; + { + LOCK(cs_main); + locator_tip_index = LookupBlockIndex(locator_tip_hash); + } + + if (!locator_tip_index) { + FatalError("%s: First block (hash=%s) in locator was not found", + __func__, locator_tip_hash.ToString()); + return; + } + + // This checks that SetBestChain callbacks are received after + // BlockConnected. The check may fail immediately after the the sync thread + // catches up and sets m_synced. Consider the case where there is a reorg + // and the blocks on the stale branch are in the ValidationInterface queue + // backlog even after the sync thread has caught up to the new chain tip. In + // this unlikely event, log a warning and let the queue clear. + const CBlockIndex *best_block_index = m_best_block_index.load(); + if (best_block_index->GetAncestor(locator_tip_index->nHeight) != + locator_tip_index) { + LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known " + "best chain (tip=%s); not writing txindex locator\n", + __func__, locator_tip_hash.ToString(), + best_block_index->GetBlockHash().ToString()); + return; + } + + if (!m_db->WriteBestBlock(locator)) { + error("%s: Failed to write locator to disk", __func__); + } +} + +bool TxIndex::BlockUntilSyncedToCurrentChain() { + AssertLockNotHeld(cs_main); + + if (!m_synced) { + return false; + } + + { + // Skip the queue-draining stuff if we know we're caught up with + // chainActive.Tip(). + LOCK(cs_main); + const CBlockIndex *chain_tip = chainActive.Tip(); + const CBlockIndex *best_block_index = m_best_block_index.load(); + if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) { + return true; + } + } + + LogPrintf("%s: txindex is catching up on block notifications\n", __func__); + SyncWithValidationInterfaceQueue(); + return true; +} + +bool TxIndex::FindTx(const uint256 &txid, CDiskTxPos &pos) const { + return m_db->ReadTxPos(txid, pos); +} + +void TxIndex::Interrupt() { + m_interrupt(); +} + +void TxIndex::Start() { + // Need to register this ValidationInterface before running Init(), so that + // callbacks are not missed if Init sets m_synced to true. + RegisterValidationInterface(this); + if (!Init()) { + FatalError("%s: txindex failed to initialize", __func__); + return; + } + + m_thread_sync = std::thread(&TraceThread>, "txindex", + std::bind(&TxIndex::ThreadSync, this)); +} + +void TxIndex::Stop() { + UnregisterValidationInterface(this); + + if (m_thread_sync.joinable()) { + m_thread_sync.join(); + } +} diff --git a/src/threadinterrupt.h b/src/threadinterrupt.h --- a/src/threadinterrupt.h +++ b/src/threadinterrupt.h @@ -19,6 +19,7 @@ */ class CThreadInterrupt { public: + CThreadInterrupt(); explicit operator bool() const; void operator()(); void reset(); diff --git a/src/threadinterrupt.cpp b/src/threadinterrupt.cpp --- a/src/threadinterrupt.cpp +++ b/src/threadinterrupt.cpp @@ -7,6 +7,8 @@ #include +CThreadInterrupt::CThreadInterrupt() : flag(false) {} + CThreadInterrupt::operator bool() const { return flag.load(std::memory_order_acquire); } diff --git a/test/lint/lint-format-strings.py b/test/lint/lint-format-strings.py --- a/test/lint/lint-format-strings.py +++ b/test/lint/lint-format-strings.py @@ -15,7 +15,7 @@ FALSE_POSITIVES = [ ("src/dbwrapper.cpp", "vsnprintf(p, limit - p, format, backup_ap)"), - ("src/index/base.cpp", "FatalError(const char* fmt, const Args&... args)"), + ("src/index/txindex.cpp", "FatalError(const char *fmt, const Args &... args)"), ("src/netbase.cpp", "LogConnectFailure(bool manual_connection, const char* fmt, const Args&... args)"), ("src/util.cpp", "strprintf(_(COPYRIGHT_HOLDERS), _(COPYRIGHT_HOLDERS_SUBSTITUTION))"), ("src/util.cpp", "strprintf(COPYRIGHT_HOLDERS, COPYRIGHT_HOLDERS_SUBSTITUTION)"),