diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 844412465..d83271c0d 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -1,305 +1,313 @@ // Copyright (c) 2017-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include constexpr int64_t SYNC_LOG_INTERVAL = 30; // seconds constexpr int64_t SYNC_LOCATOR_WRITE_INTERVAL = 30; // seconds std::unique_ptr g_txindex; template static void FatalError(const char *fmt, const Args &... args) { std::string strMessage = tfm::format(fmt, args...); SetMiscWarning(strMessage); LogPrintf("*** %s\n", strMessage); uiInterface.ThreadSafeMessageBox( "Error: A fatal internal error occurred, see debug.log for details", "", CClientUIInterface::MSG_ERROR); StartShutdown(); } -TxIndex::TxIndex(std::unique_ptr db) - : m_db(std::move(db)), m_synced(false), m_best_block_index(nullptr) {} +TxIndex::TxIndex(std::unique_ptr db) : m_db(std::move(db)) {} -TxIndex::~TxIndex() { +BaseIndex::~BaseIndex() { Interrupt(); Stop(); } bool TxIndex::Init() { LOCK(cs_main); // Attempt to migrate txindex from the old database to the new one. Even if // chain_tip is null, the node could be reindexing and we still want to // delete txindex records in the old database. if (!m_db->MigrateData(*pblocktree, chainActive.GetLocator())) { return false; } + return BaseIndex::Init(); +} + +bool BaseIndex::Init() { CBlockLocator locator; - if (!m_db->ReadBestBlock(locator)) { + if (!GetDB().ReadBestBlock(locator)) { locator.SetNull(); } + LOCK(cs_main); m_best_block_index = FindForkInGlobalIndex(chainActive, locator); m_synced = m_best_block_index.load() == chainActive.Tip(); return true; } static const CBlockIndex *NextSyncBlock(const CBlockIndex *pindex_prev) { AssertLockHeld(cs_main); if (!pindex_prev) { return chainActive.Genesis(); } const CBlockIndex *pindex = chainActive.Next(pindex_prev); if (pindex) { return pindex; } return chainActive.Next(chainActive.FindFork(pindex_prev)); } -void TxIndex::ThreadSync() { +void BaseIndex::ThreadSync() { const CBlockIndex *pindex = m_best_block_index.load(); if (!m_synced) { auto &config = GetConfig(); int64_t last_log_time = 0; int64_t last_locator_write_time = 0; while (true) { if (m_interrupt) { WriteBestBlock(pindex); return; } { LOCK(cs_main); const CBlockIndex *pindex_next = NextSyncBlock(pindex); if (!pindex_next) { WriteBestBlock(pindex); m_best_block_index = pindex; m_synced = true; break; } pindex = pindex_next; } int64_t current_time = GetTime(); if (last_log_time + SYNC_LOG_INTERVAL < current_time) { LogPrintf("Syncing txindex with block chain from height %d\n", pindex->nHeight); last_log_time = current_time; } if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) { WriteBestBlock(pindex); last_locator_write_time = current_time; } CBlock block; if (!ReadBlockFromDisk(block, pindex, config)) { FatalError("%s: Failed to read block %s from disk", __func__, pindex->GetBlockHash().ToString()); return; } if (!WriteBlock(block, pindex)) { FatalError("%s: Failed to write block %s to tx index database", __func__, pindex->GetBlockHash().ToString()); return; } } } if (pindex) { LogPrintf("txindex is enabled at height %d\n", pindex->nHeight); } else { LogPrintf("txindex is enabled\n"); } } bool TxIndex::WriteBlock(const CBlock &block, const CBlockIndex *pindex) { CDiskTxPos pos(pindex->GetBlockPos(), GetSizeOfCompactSize(block.vtx.size())); std::vector> vPos; vPos.reserve(block.vtx.size()); for (const auto &tx : block.vtx) { vPos.emplace_back(tx->GetHash(), pos); pos.nTxOffset += ::GetSerializeSize(*tx, SER_DISK, CLIENT_VERSION); } return m_db->WriteTxs(vPos); } -bool TxIndex::WriteBestBlock(const CBlockIndex *block_index) { +BaseIndexDB &TxIndex::GetDB() const { + return *m_db; +} + +bool BaseIndex::WriteBestBlock(const CBlockIndex *block_index) { LOCK(cs_main); - if (!m_db->WriteBestBlock(chainActive.GetLocator(block_index))) { + if (!GetDB().WriteBestBlock(chainActive.GetLocator(block_index))) { return error("%s: Failed to write locator to disk", __func__); } return true; } -void TxIndex::BlockConnected( +void BaseIndex::BlockConnected( const std::shared_ptr &block, const CBlockIndex *pindex, const std::vector &txn_conflicted) { if (!m_synced) { return; } const CBlockIndex *best_block_index = m_best_block_index.load(); if (!best_block_index) { if (pindex->nHeight != 0) { FatalError("%s: First block connected is not the genesis block " "(height=%d)", __func__, pindex->nHeight); return; } } else { // Ensure block connects to an ancestor of the current best block. This // should be the case most of the time, but may not be immediately after // the the sync thread catches up and sets m_synced. Consider the case // where there is a reorg and the blocks on the stale branch are in the // ValidationInterface queue backlog even after the sync thread has // caught up to the new chain tip. In this unlikely event, log a warning // and let the queue clear. if (best_block_index->GetAncestor(pindex->nHeight - 1) != pindex->pprev) { LogPrintf("%s: WARNING: Block %s does not connect to an ancestor " "of known best chain (tip=%s); not updating txindex\n", __func__, pindex->GetBlockHash().ToString(), best_block_index->GetBlockHash().ToString()); return; } } if (WriteBlock(*block, pindex)) { m_best_block_index = pindex; } else { FatalError("%s: Failed to write block %s to txindex", __func__, pindex->GetBlockHash().ToString()); return; } } -void TxIndex::ChainStateFlushed(const CBlockLocator &locator) { +void BaseIndex::ChainStateFlushed(const CBlockLocator &locator) { if (!m_synced) { return; } const uint256 &locator_tip_hash = locator.vHave.front(); const CBlockIndex *locator_tip_index; { LOCK(cs_main); locator_tip_index = LookupBlockIndex(locator_tip_hash); } if (!locator_tip_index) { FatalError("%s: First block (hash=%s) in locator was not found", __func__, locator_tip_hash.ToString()); return; } // This checks that ChainStateFlushed callbacks are received after // BlockConnected. The check may fail immediately after the the sync thread // catches up and sets m_synced. Consider the case where there is a reorg // and the blocks on the stale branch are in the ValidationInterface queue // backlog even after the sync thread has caught up to the new chain tip. In // this unlikely event, log a warning and let the queue clear. const CBlockIndex *best_block_index = m_best_block_index.load(); if (best_block_index->GetAncestor(locator_tip_index->nHeight) != locator_tip_index) { LogPrintf("%s: WARNING: Locator contains block (hash=%s) not on known " "best chain (tip=%s); not writing txindex locator\n", __func__, locator_tip_hash.ToString(), best_block_index->GetBlockHash().ToString()); return; } - if (!m_db->WriteBestBlock(locator)) { + if (!GetDB().WriteBestBlock(locator)) { error("%s: Failed to write locator to disk", __func__); } } -bool TxIndex::BlockUntilSyncedToCurrentChain() { +bool BaseIndex::BlockUntilSyncedToCurrentChain() { AssertLockNotHeld(cs_main); if (!m_synced) { return false; } { // Skip the queue-draining stuff if we know we're caught up with // chainActive.Tip(). LOCK(cs_main); const CBlockIndex *chain_tip = chainActive.Tip(); const CBlockIndex *best_block_index = m_best_block_index.load(); if (best_block_index->GetAncestor(chain_tip->nHeight) == chain_tip) { return true; } } LogPrintf("%s: txindex is catching up on block notifications\n", __func__); SyncWithValidationInterfaceQueue(); return true; } bool TxIndex::FindTx(const uint256 &tx_hash, uint256 &block_hash, CTransactionRef &tx) const { CDiskTxPos postx; if (!m_db->ReadTxPos(tx_hash, postx)) { return false; } CAutoFile file(OpenBlockFile(postx, true), SER_DISK, CLIENT_VERSION); if (file.IsNull()) { return error("%s: OpenBlockFile failed", __func__); } CBlockHeader header; try { file >> header; fseek(file.Get(), postx.nTxOffset, SEEK_CUR); file >> tx; } catch (const std::exception &e) { return error("%s: Deserialize or I/O error - %s", __func__, e.what()); } if (tx->GetHash() != tx_hash) { return error("%s: txid mismatch", __func__); } block_hash = header.GetHash(); return true; } -void TxIndex::Interrupt() { +void BaseIndex::Interrupt() { m_interrupt(); } -void TxIndex::Start() { +void BaseIndex::Start() { // Need to register this ValidationInterface before running Init(), so that // callbacks are not missed if Init sets m_synced to true. RegisterValidationInterface(this); if (!Init()) { FatalError("%s: txindex failed to initialize", __func__); return; } m_thread_sync = std::thread(&TraceThread>, "txindex", - std::bind(&TxIndex::ThreadSync, this)); + std::bind(&BaseIndex::ThreadSync, this)); } -void TxIndex::Stop() { +void BaseIndex::Stop() { UnregisterValidationInterface(this); if (m_thread_sync.joinable()) { m_thread_sync.join(); } } diff --git a/src/index/txindex.h b/src/index/txindex.h index 22db753a2..009047382 100644 --- a/src/index/txindex.h +++ b/src/index/txindex.h @@ -1,98 +1,119 @@ // Copyright (c) 2017-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_INDEX_TXINDEX_H #define BITCOIN_INDEX_TXINDEX_H #include #include #include #include #include #include class CBlockIndex; /** - * TxIndex is used to look up transactions included in the blockchain by hash. - * The index is written to a LevelDB database and records the filesystem - * location of each transaction by transaction hash. + * Base class for indices of blockchain data. This implements + * CValidationInterface and ensures blocks are indexed sequentially according + * to their position in the active chain. */ -class TxIndex final : public CValidationInterface { +class BaseIndex : public CValidationInterface { private: - const std::unique_ptr m_db; - /// Whether the index is in sync with the main chain. The flag is flipped /// from false to true once, after which point this starts processing /// ValidationInterface notifications to stay in sync. - std::atomic m_synced; + std::atomic m_synced{false}; - /// The last block in the chain that the TxIndex is in sync with. - std::atomic m_best_block_index; + /// The last block in the chain that the index is in sync with. + std::atomic m_best_block_index{nullptr}; std::thread m_thread_sync; CThreadInterrupt m_interrupt; - /// Initialize internal state from the database and block index. - bool Init(); - - /// Sync the tx index with the block index starting from the current best + /// Sync the index with the block index starting from the current best /// block. Intended to be run in its own thread, m_thread_sync, and can be - /// interrupted with m_interrupt. Once the txindex gets in sync, the - /// m_synced flag is set and the BlockConnected ValidationInterface callback - /// takes over and the sync thread exits. + /// interrupted with m_interrupt. Once the index gets in sync, the m_synced + /// flag is set and the BlockConnected ValidationInterface callback takes + /// over and the sync thread exits. void ThreadSync(); - /// Write update index entries for a newly connected block. - bool WriteBlock(const CBlock &block, const CBlockIndex *pindex); - /// Write the current chain block locator to the DB. bool WriteBestBlock(const CBlockIndex *block_index); protected: void BlockConnected(const std::shared_ptr &block, const CBlockIndex *pindex, const std::vector &txn_conflicted) override; void ChainStateFlushed(const CBlockLocator &locator) override; -public: - /// Constructs the TxIndex, which becomes available to be queried. - explicit TxIndex(std::unique_ptr db); + /// Initialize internal state from the database and block index. + virtual bool Init(); + /// Write update index entries for a newly connected block. + virtual bool WriteBlock(const CBlock &block, const CBlockIndex *pindex) { + return true; + } + + virtual BaseIndexDB &GetDB() const = 0; + +public: /// Destructor interrupts sync thread if running and blocks until it exits. - ~TxIndex(); + virtual ~BaseIndex(); - /// Blocks the current thread until the transaction index is caught up to - /// the current state of the block chain. This only blocks if the index has - /// gotten in sync once and only needs to process blocks in the - /// ValidationInterface queue. If the index is catching up from far behind, - /// this method does not block and immediately returns false. + /// Blocks the current thread until the index is caught up to the current + /// state of the block chain. This only blocks if the index has gotten in + /// sync once and only needs to process blocks in the ValidationInterface + /// queue. If the index is catching up from far behind, this method does + /// not block and immediately returns false. bool BlockUntilSyncedToCurrentChain(); + void Interrupt(); + + /// Start initializes the sync state and registers the instance as a + /// ValidationInterface so that it stays in sync with blockchain updates. + void Start(); + + /// Stops the instance from staying in sync with blockchain updates. + void Stop(); +}; + +/** + * TxIndex is used to look up transactions included in the blockchain by hash. + * The index is written to a LevelDB database and records the filesystem + * location of each transaction by transaction hash. + */ +class TxIndex final : public BaseIndex { +private: + const std::unique_ptr m_db; + +protected: + /// Override base class init to migrate from old database. + bool Init() override; + + bool WriteBlock(const CBlock &block, const CBlockIndex *pindex) override; + + BaseIndexDB &GetDB() const override; + +public: + /// Constructs the index, which becomes available to be queried. + explicit TxIndex(std::unique_ptr db); + /// Look up a transaction by hash. /// /// @param[in] tx_hash The hash of the transaction to be returned. /// @param[out] block_hash The hash of the block the transaction is found /// in. /// @param[out] tx The transaction itself. /// @return true if transaction is found, false otherwise bool FindTx(const uint256 &tx_hash, uint256 &block_hash, CTransactionRef &tx) const; - - void Interrupt(); - - /// Start initializes the sync state and registers the instance as a - /// ValidationInterface so that it stays in sync with blockchain updates. - void Start(); - - /// Stops the instance from staying in sync with blockchain updates. - void Stop(); }; /// The global transaction index, used in GetTransaction. May be null. extern std::unique_ptr g_txindex; #endif // BITCOIN_INDEX_TXINDEX_H