Page MenuHomePhabricator

D15465.id45297.diff
No OneTemporary

D15465.id45297.diff

diff --git a/Cargo.lock b/Cargo.lock
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -173,6 +173,12 @@
"syn 2.0.29",
]
+[[package]]
+name = "bit-vec"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
+
[[package]]
name = "bitcoinsuite-core"
version = "0.1.0"
@@ -222,6 +228,17 @@
"generic-array",
]
+[[package]]
+name = "bloomfilter"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b92db7965d438b8b4b1c1d0aedd188440a1084593c9eb7f6657e3df7e906d934"
+dependencies = [
+ "bit-vec",
+ "getrandom",
+ "siphasher",
+]
+
[[package]]
name = "byteorder"
version = "1.4.3"
@@ -292,6 +309,7 @@
"bimap",
"bitcoinsuite-core",
"bitcoinsuite-slp",
+ "bloomfilter",
"bytes",
"chronik-util",
"fastrand",
@@ -1623,6 +1641,12 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
+[[package]]
+name = "siphasher"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe"
+
[[package]]
name = "slab"
version = "0.4.9"
diff --git a/chronik/chronik-cpp/chronik.h b/chronik/chronik-cpp/chronik.h
--- a/chronik/chronik-cpp/chronik.h
+++ b/chronik/chronik-cpp/chronik.h
@@ -17,6 +17,11 @@
static const std::vector<std::string> DEFAULT_BINDS = {"127.0.0.1", "::1"};
+static const bool DEFAULT_SCRIPT_HISTORY_BLOOM_IS_ENABLED = true;
+static const double DEFAULT_SCRIPT_HISTORY_BLOOM_FALSE_POSITIVE_RATE = 0.90;
+static const size_t DEFAULT_SCRIPT_HISTORY_BLOOM_EXPECTED_NUM_ITEMS =
+ 100'000'000;
+
// Registers Chronik indexer as ValidationInterface, listens to HTTP queries
bool Start(const Config &config, const node::NodeContext &node, bool fWipe);
diff --git a/chronik/chronik-cpp/chronik.cpp b/chronik/chronik-cpp/chronik.cpp
--- a/chronik/chronik-cpp/chronik.cpp
+++ b/chronik/chronik-cpp/chronik.cpp
@@ -57,6 +57,17 @@
params.NetworkIDString() == CBaseChainParams::REGTEST
? uint64_t(count_seconds(WS_PING_INTERVAL_REGTEST))
: uint64_t(count_seconds(WS_PING_INTERVAL_DEFAULT)),
+ .script_history =
+ {
+ .is_bloom_enabled = gArgs.GetBoolArg(
+ "-chronikscripthistorybloomfilter",
+ DEFAULT_SCRIPT_HISTORY_BLOOM_IS_ENABLED),
+ .false_positive_rate =
+ DEFAULT_SCRIPT_HISTORY_BLOOM_FALSE_POSITIVE_RATE,
+ .expected_num_items = (size_t)gArgs.GetIntArg(
+ "-chronikscripthistoryexpectednumitems",
+ DEFAULT_SCRIPT_HISTORY_BLOOM_EXPECTED_NUM_ITEMS),
+ },
},
config, node);
}
diff --git a/chronik/chronik-cpp/chronik_validationinterface.cpp b/chronik/chronik-cpp/chronik_validationinterface.cpp
--- a/chronik/chronik-cpp/chronik_validationinterface.cpp
+++ b/chronik/chronik-cpp/chronik_validationinterface.cpp
@@ -23,7 +23,10 @@
void Register() { RegisterValidationInterface(this); }
- void Unregister() { UnregisterValidationInterface(this); }
+ void Shutdown() {
+ UnregisterValidationInterface(this);
+ m_chronik->shutdown();
+ }
private:
rust::Box<chronik_bridge::Chronik> m_chronik;
@@ -76,7 +79,7 @@
void StopChronikValidationInterface() {
if (g_chronik_validation_interface) {
- g_chronik_validation_interface->Unregister();
+ g_chronik_validation_interface->Shutdown();
// Reset so the Box is dropped and all handles are released.
g_chronik_validation_interface.reset();
}
diff --git a/chronik/chronik-db/Cargo.toml b/chronik/chronik-db/Cargo.toml
--- a/chronik/chronik-db/Cargo.toml
+++ b/chronik/chronik-db/Cargo.toml
@@ -22,6 +22,8 @@
# Efficient byte strings, with ref counted substrings
bytes = "1.4"
+bloomfilter = "1.0"
+
# En-/decode byte strings from/to hex
hex = "0.4"
diff --git a/chronik/chronik-db/src/db.rs b/chronik/chronik-db/src/db.rs
--- a/chronik/chronik-db/src/db.rs
+++ b/chronik/chronik-db/src/db.rs
@@ -45,12 +45,16 @@
pub const CF_SCRIPT_HISTORY: &str = "script_history";
/// Column family to store number of txs by script.
pub const CF_SCRIPT_HISTORY_NUM_TXS: &str = "script_history_num_txs";
+/// Column family to store cache data for the script history
+pub const CF_SCRIPT_HISTORY_CACHE: &str = "script_history_cache";
/// Column family for utxos by script.
pub const CF_SCRIPT_UTXO: &str = "script_utxo";
/// Column family to store tx history by token ID.
pub const CF_TOKEN_ID_HISTORY: &str = "token_id_history";
/// Column family to store number of txs by token ID.
pub const CF_TOKEN_ID_HISTORY_NUM_TXS: &str = "token_id_history_num_txs";
+/// Column family to store cache data for token ID history
+pub const CF_TOKEN_ID_HISTORY_CACHE: &str = "token_id_history_cache";
/// Column family for utxos by token ID.
pub const CF_TOKEN_ID_UTXO: &str = "token_id_utxo";
/// Column family to store which outputs have been spent by which tx inputs.
diff --git a/chronik/chronik-db/src/groups/script.rs b/chronik/chronik-db/src/groups/script.rs
--- a/chronik/chronik-db/src/groups/script.rs
+++ b/chronik/chronik-db/src/groups/script.rs
@@ -6,7 +6,10 @@
use bytes::Bytes;
use crate::{
- db::{CF_SCRIPT_HISTORY, CF_SCRIPT_HISTORY_NUM_TXS, CF_SCRIPT_UTXO},
+ db::{
+ CF_SCRIPT_HISTORY, CF_SCRIPT_HISTORY_CACHE, CF_SCRIPT_HISTORY_NUM_TXS,
+ CF_SCRIPT_UTXO,
+ },
group::{Group, GroupQuery, MemberItem, UtxoDataValue},
io::{
GroupHistoryConf, GroupHistoryReader, GroupHistoryWriter,
@@ -84,6 +87,7 @@
GroupHistoryConf {
cf_page_name: CF_SCRIPT_HISTORY,
cf_num_txs_name: CF_SCRIPT_HISTORY_NUM_TXS,
+ cf_cache_name: CF_SCRIPT_HISTORY_CACHE,
page_size: 1000,
}
}
diff --git a/chronik/chronik-db/src/groups/token_id.rs b/chronik/chronik-db/src/groups/token_id.rs
--- a/chronik/chronik-db/src/groups/token_id.rs
+++ b/chronik/chronik-db/src/groups/token_id.rs
@@ -10,7 +10,8 @@
use crate::{
db::{
- Db, CF_TOKEN_ID_HISTORY, CF_TOKEN_ID_HISTORY_NUM_TXS, CF_TOKEN_ID_UTXO,
+ Db, CF_TOKEN_ID_HISTORY, CF_TOKEN_ID_HISTORY_CACHE,
+ CF_TOKEN_ID_HISTORY_NUM_TXS, CF_TOKEN_ID_UTXO,
},
group::{Group, GroupQuery, MemberItem, UtxoDataOutput},
index_tx::IndexTx,
@@ -107,6 +108,7 @@
GroupHistoryConf {
cf_page_name: CF_TOKEN_ID_HISTORY,
cf_num_txs_name: CF_TOKEN_ID_HISTORY_NUM_TXS,
+ cf_cache_name: CF_TOKEN_ID_HISTORY_CACHE,
page_size: 1000,
}
}
diff --git a/chronik/chronik-db/src/io/group_history.rs b/chronik/chronik-db/src/io/group_history.rs
--- a/chronik/chronik-db/src/io/group_history.rs
+++ b/chronik/chronik-db/src/io/group_history.rs
@@ -5,7 +5,9 @@
use std::{collections::BTreeMap, marker::PhantomData, time::Instant};
use abc_rust_error::Result;
+use chronik_util::{log, log_chronik};
use rocksdb::WriteBatch;
+use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::{
@@ -13,9 +15,10 @@
group::{tx_members_for_group, Group, GroupQuery},
index_tx::IndexTx,
io::{
- group_history::GroupHistoryError::*, merge::catch_merge_errors, TxNum,
+ group_history::GroupHistoryError::*, merge::catch_merge_errors,
+ BlockHeight, BlockReader, TxNum,
},
- ser::{db_deserialize_vec, db_serialize_vec},
+ ser::{db_deserialize, db_deserialize_vec, db_serialize, db_serialize_vec},
};
/// Represent page numbers with 32-bit unsigned integers.
@@ -27,6 +30,8 @@
const CONCAT: u8 = b'C';
const TRIM: u8 = b'T';
+const KEY_CACHE_BLOOM: &[u8] = b"bloom";
+
/// Configuration for group history reader/writers.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct GroupHistoryConf {
@@ -34,6 +39,9 @@
pub cf_page_name: &'static str,
/// Column family to store the last page num of the group history.
pub cf_num_txs_name: &'static str,
+ /// Column family to store serialized cache data (e.g. bloom filter) of the
+ /// group history.
+ pub cf_cache_name: &'static str,
/// Page size for each member of the group.
pub page_size: NumTxs,
}
@@ -42,6 +50,7 @@
db: &'a Db,
cf_page: &'a CF,
cf_num_txs: &'a CF,
+ cf_cache: &'a CF,
}
/// Write txs grouped and paginated to the DB.
@@ -61,6 +70,19 @@
/// also between pages, such that the entire tx history of a member can be
/// iterated by going through pages 0..N and going through all of the txs of
/// each page.
+///
+/// Optionally, there is also a bloom filter speeding up indexing significantly,
+/// which let's us know if a member is definitely not in the DB, and we can
+/// assume the number of txs for this member is 0. This is a very common case
+/// (most scripts are used only a few times, and one of them must be the first),
+/// and in this case it allows us to skip reading from the DB completely and
+/// just issue a merge_cf call right away.
+///
+/// We store the bloom filter at shutdown and load it at init. We have to make
+/// 100% sure the bloom filter is valid and up-to-date with the rest of the DB,
+/// because otherwise we don't have the guarantee that a script isn't in the DB
+/// if it's not in the bloom filter. If that were to fail, we could insert the
+/// history entry on the wrong page, causing all sorts of problems.
#[derive(Debug)]
pub struct GroupHistoryWriter<'a, G: Group> {
col: GroupHistoryColumn<'a>,
@@ -76,11 +98,36 @@
phantom: PhantomData<G>,
}
+/// Settings for group history, e.g. whether to use a bloom filter or LRU cache.
+#[derive(Clone, Debug, Default)]
+pub struct GroupHistorySettings {
+ /// Whether to use a bloom filter to determine if a member has any history
+ pub is_bloom_filter_enabled: bool,
+ /// Bloom filter false positive rate
+ pub false_positive_rate: f64,
+ /// Expected number of total distinct members of the group
+ pub expected_num_items: usize,
+}
+
/// In-memory data for the tx history.
#[derive(Debug, Default)]
pub struct GroupHistoryMemData {
/// Stats about cache hits, num requests etc.
pub stats: GroupHistoryStats,
+ /// In-memory data to speed up indexing, e.g. LRU caches or bloom filters
+ pub cache: GroupHistoryCache,
+}
+
+/// In-memory data to speed up indexing, e.g. LRU caches or bloom filters
+#[derive(Default)]
+pub struct GroupHistoryCache {
+ bloom: Option<GroupHistoryBloomFilter>,
+}
+
+struct GroupHistoryBloomFilter {
+ bloom_filter: bloomfilter::Bloom<[u8]>,
+ false_positive_rate: f64,
+ expected_num_items: usize,
}
/// Stats about cache hits, num requests etc.
@@ -88,18 +135,30 @@
pub struct GroupHistoryStats {
/// Total number of members updated.
pub n_total: usize,
+ /// Num of total hits of the bloom filter
+ pub n_bloom_hits: usize,
+ /// Num of hits that turned out to be false positives
+ pub n_bloom_false_positives: usize,
+ /// Size of the bloom filter data in bytes
+ pub n_bloom_num_bytes: usize,
+ /// Number of entries fetched from the DB
+ pub n_fetched: usize,
/// Time [s] for insert/delete.
pub t_total: f64,
/// Time [s] for grouping txs.
pub t_group: f64,
/// Time [s] for serializing members.
pub t_ser_members: f64,
+ /// Time [s] for checking the bloom filter.
+ pub t_bloom: f64,
/// Time [s] for fetching existing tx data.
pub t_fetch: f64,
+ /// Time [s] for counting the set bits in the bloom filter at startup.
+ pub t_init_num_bits: f64,
}
/// Error indicating that something went wrong with writing group history data.
-#[derive(Debug, Error, PartialEq, Eq)]
+#[derive(Debug, Error, PartialEq)]
pub enum GroupHistoryError {
/// Bad num_txs size
#[error("Inconsistent DB: Bad num_txs size: {0:?}")]
@@ -111,14 +170,60 @@
hex::encode(.1),
)]
UnknownOperandPrefix(u8, Vec<u8>),
+
+ /// Mismached bloom filter settings: configured <> DB
+ #[error(
+ "Mismached bloom filter settings: configured fpr \
+ {configured_fp_rate}, DB has {db_fp_rate}; configured expected N \
+ {configured_expected_n}, DB has {db_fp_rate}"
+ )]
+ MismatchedBloomFilterSettings {
+ /// Configured FP rate
+ configured_fp_rate: f64,
+ /// FP rate in the DB
+ db_fp_rate: f64,
+ /// Configured expected N
+ configured_expected_n: usize,
+ /// Expected N in the DB
+ db_expected_n: usize,
+ },
+
+ /// Mismatched bloom filter block height: DB <> bloom filter
+ #[error(
+ "Inconsistent group history bloom filter block height: DB is at \
+ {db_height} but bloom filter is for {bloom_height}, consider wiping \
+ the bloom filter or -chronikreindex to restore"
+ )]
+ MismatchedBloomFilterHeight {
+ /// Block height of the DB (BlockWriter)
+ db_height: BlockHeight,
+ /// Block height in the existing bloom filter
+ bloom_height: BlockHeight,
+ },
+}
+
+enum BloomResult {
+ Hit,
+ NoHit,
}
struct FetchedNumTxs<'tx, G: Group> {
- members_num_txs: Vec<NumTxs>,
+ members_num_txs: Vec<(NumTxs, BloomResult)>,
grouped_txs: BTreeMap<G::Member<'tx>, Vec<TxNum>>,
ser_members: Vec<G::MemberSer<'tx>>,
}
+#[derive(Serialize, Deserialize)]
+struct SerBloomFilter {
+ block_height: BlockHeight,
+ bytes: Vec<u8>,
+ bitmap_bits: u64,
+ k_num: u32,
+ sip_keys: [(u64, u64); 2],
+ false_positive_rate: f64,
+ expected_num_items: usize,
+}
+
pub(crate) fn bytes_to_num_txs(bytes: &[u8]) -> Result<NumTxs> {
Ok(NumTxs::from_be_bytes(
bytes
@@ -174,10 +279,12 @@
fn new(db: &'a Db, conf: &GroupHistoryConf) -> Result<Self> {
let cf_page = db.cf(conf.cf_page_name)?;
let cf_num_txs = db.cf(conf.cf_num_txs_name)?;
+ let cf_cache = db.cf(conf.cf_cache_name)?;
Ok(GroupHistoryColumn {
db,
cf_page,
cf_num_txs,
+ cf_cache,
})
}
@@ -203,6 +310,107 @@
Ok(GroupHistoryWriter { col, conf, group })
}
+ /// Load cache data from the DB to `mem_data` at startup.
+ /// For the bloom filter, this is important to get right otherwise we will
+ /// get false negatives and a garbled history.
+ pub fn init(&self, mem_data: &mut GroupHistoryMemData) -> Result<()> {
+ let block_height = BlockReader::new(self.col.db)?.height()?;
+ if let Some(ser_bloom_filter) =
+ self.col.db.get(self.col.cf_cache, KEY_CACHE_BLOOM)?
+ {
+ let bloom_filter =
+ db_deserialize::<SerBloomFilter>(&ser_bloom_filter)?;
+ let t_num_bits = Instant::now();
+ let num_bits = bloom_filter
+ .bytes
+ .iter()
+ .map(|byte| byte.count_ones())
+ .sum::<u32>();
+ mem_data.stats.t_init_num_bits = t_num_bits.elapsed().as_secs_f64();
+ if let Some(configured) = &mem_data.cache.bloom {
+ if configured.bloom_filter.number_of_bits()
+ != bloom_filter.bitmap_bits
+ || configured.bloom_filter.number_of_hash_functions()
+ != bloom_filter.k_num
+ {
+ return Err(MismatchedBloomFilterSettings {
+ configured_fp_rate: configured.false_positive_rate,
+ db_fp_rate: bloom_filter.false_positive_rate,
+ configured_expected_n: configured.expected_num_items,
+ db_expected_n: bloom_filter.expected_num_items,
+ }
+ .into());
+ }
+ if bloom_filter.block_height != block_height {
+ return Err(MismatchedBloomFilterHeight {
+ db_height: block_height,
+ bloom_height: bloom_filter.block_height,
+ }
+ .into());
+ }
+ log_chronik!(
+ "Loaded bloom filter for {:?}, with {num_bits} bits set\n",
+ self.conf.cf_page_name,
+ );
+ mem_data.cache.bloom = Some(GroupHistoryBloomFilter {
+ bloom_filter: bloomfilter::Bloom::from_existing(
+ &bloom_filter.bytes,
+ bloom_filter.bitmap_bits,
+ bloom_filter.k_num,
+ bloom_filter.sip_keys,
+ ),
+ false_positive_rate: configured.false_positive_rate,
+ expected_num_items: configured.expected_num_items,
+ });
+ } else {
+ log!(
+ "Chronik: Ignoring existing bloom filter in DB (for block \
+ height {}, {} available bits, {} bits set, {:.1}% fpr, \
+ {} expected num items); will be wiped at shutdown\n",
+ bloom_filter.block_height,
+ bloom_filter.bitmap_bits,
+ num_bits,
+ bloom_filter.false_positive_rate * 100.0,
+ bloom_filter.expected_num_items,
+ );
+ }
+ }
+ Ok(())
+ }
+
+ /// Write cache data to the DB data at shutdown so we can restore it later.
+ pub fn shutdown(&self, mem_data: &mut GroupHistoryMemData) -> Result<()> {
+ let block_height = BlockReader::new(self.col.db)?.height()?;
+ let mut batch = WriteBatch::default();
+ if let Some(bloom) = &mem_data.cache.bloom {
+ let bloom_filter = SerBloomFilter {
+ block_height,
+ bytes: bloom.bloom_filter.bitmap(),
+ bitmap_bits: bloom.bloom_filter.number_of_bits(),
+ k_num: bloom.bloom_filter.number_of_hash_functions(),
+ sip_keys: bloom.bloom_filter.sip_keys(),
+ false_positive_rate: bloom.false_positive_rate,
+ expected_num_items: bloom.expected_num_items,
+ };
+ let ser_bloom_filter = db_serialize(&bloom_filter)?;
+
+ batch.put_cf(self.col.cf_cache, KEY_CACHE_BLOOM, &ser_bloom_filter);
+ } else {
+ let bloom = self.col.db.get(self.col.cf_cache, KEY_CACHE_BLOOM)?;
+ if bloom.is_some() {
+ log!(
+ "Chronik: Deleting existing bloom filter for {:?}\n",
+ self.conf.cf_page_name,
+ );
+ }
+ // Important: delete bloom filter from DB if unset to prevent false
+ // negatives
+ batch.delete_cf(self.col.cf_cache, KEY_CACHE_BLOOM);
+ }
+ self.col.db.write_batch(batch)?;
+ Ok(())
+ }
+
/// Group the txs, then insert them to into each member of the group.
pub fn insert(
&self,
@@ -213,11 +421,12 @@
) -> Result<()> {
let t_start = Instant::now();
let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?;
- for ((mut new_tx_nums, member_ser), mut num_txs) in fetched
- .grouped_txs
- .into_values()
- .zip(fetched.ser_members)
- .zip(fetched.members_num_txs)
+ for ((mut new_tx_nums, member_ser), (mut num_txs, bloom_result)) in
+ fetched
+ .grouped_txs
+ .into_values()
+ .zip(fetched.ser_members)
+ .zip(fetched.members_num_txs)
{
let mut page_num = num_txs / self.conf.page_size;
let mut last_page_num_txs = num_txs % self.conf.page_size;
@@ -239,6 +448,9 @@
member_ser.as_ref(),
num_txs.to_be_bytes(),
);
+ if matches!(bloom_result, BloomResult::NoHit) {
+ mem_data.cache.add_to_bloom_filter(member_ser.as_ref());
+ }
break;
}
last_page_num_txs = 0;
@@ -258,8 +470,18 @@
mem_data: &mut GroupHistoryMemData,
) -> Result<()> {
let t_start = Instant::now();
+
+ // On reorg, wipe bloom filter. This is because we cannot ever remove
+ // elements from a (normal) bloom filter. In practice, the main
+ // benefit for the bloom filter is during IBD, where no reorgs occur.
+ // If tx volume picks up and reorgs wiping the bloom filter becomes a
+ // problem, we can introduce checkpointing such that recent
+ // bloom filters can be recovered. With the addition of Avalanche, the
+ // deepest reorg depth should only ever be 1.
+ mem_data.cache.bloom = None;
+
let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?;
- for ((mut removed_tx_nums, member_ser), mut num_txs) in fetched
+ for ((mut removed_tx_nums, member_ser), (mut num_txs, _)) in fetched
.grouped_txs
.into_values()
.zip(fetched.ser_members)
@@ -320,7 +542,7 @@
aux: &G::Aux,
mem_data: &mut GroupHistoryMemData,
) -> Result<FetchedNumTxs<'tx, G>> {
- let GroupHistoryMemData { stats } = mem_data;
+ let GroupHistoryMemData { stats, cache } = mem_data;
let t_group = Instant::now();
let grouped_txs = self.group_txs(txs, aux);
stats.t_group += t_group.elapsed().as_secs_f64();
@@ -334,19 +556,45 @@
stats.n_total += grouped_txs.len();
+ let t_bloom = Instant::now();
+ let mut members_num_txs = Vec::with_capacity(ser_members.len());
+ for member_ser in &ser_members {
+ if cache.check_bloom_filter(member_ser.as_ref()) {
+ stats.n_bloom_hits += 1;
+ members_num_txs.push((0, BloomResult::Hit));
+ } else {
+ members_num_txs.push((0, BloomResult::NoHit));
+ }
+ }
+ stats.t_bloom += t_bloom.elapsed().as_secs_f64();
+
let t_fetch = Instant::now();
- let num_txs_keys =
- ser_members.iter().map(|member_ser| member_ser.as_ref());
+ let num_txs_keys = ser_members.iter().zip(&members_num_txs).filter_map(
+ |(member_ser, (_, bloom_result))| match bloom_result {
+ BloomResult::Hit => Some(member_ser.as_ref()),
+ BloomResult::NoHit => None,
+ },
+ );
let fetched_num_txs =
self.col
.db
.multi_get(self.col.cf_num_txs, num_txs_keys, true)?;
- let mut members_num_txs = Vec::with_capacity(fetched_num_txs.len());
- for db_num_txs in fetched_num_txs {
- members_num_txs.push(match db_num_txs {
- Some(db_num_txs) => bytes_to_num_txs(&db_num_txs)?,
- None => 0,
- });
+ stats.n_fetched += fetched_num_txs.len();
+ for ((member_num_txs, _), db_num_txs) in members_num_txs
+ .iter_mut()
+ .filter(|(_, bloom_result)| {
+ matches!(bloom_result, BloomResult::Hit)
+ })
+ .zip(fetched_num_txs)
+ {
+ match db_num_txs {
+ Some(db_num_txs) => {
+ *member_num_txs = bytes_to_num_txs(&db_num_txs)?;
+ }
+ None => {
+ stats.n_bloom_false_positives += 1;
+ }
+ }
}
stats.t_fetch += t_fetch.elapsed().as_secs_f64();
@@ -402,6 +650,53 @@
conf.cf_num_txs_name,
rocksdb::Options::default(),
));
+ columns.push(rocksdb::ColumnFamilyDescriptor::new(
+ conf.cf_cache_name,
+ rocksdb::Options::default(),
+ ));
+ }
+}
+
+impl GroupHistoryMemData {
+ /// Create a new [`GroupHistoryMemData`] using the given
+ /// [`GroupHistorySettings`].
+ pub fn new(settings: GroupHistorySettings) -> Self {
+ let mut stats = GroupHistoryStats::default();
+ let cache = match settings.is_bloom_filter_enabled {
+ false => GroupHistoryCache { bloom: None },
+ true => GroupHistoryCache {
+ bloom: Some({
+ let bloom_filter = bloomfilter::Bloom::new_for_fp_rate(
+ settings.expected_num_items,
+ settings.false_positive_rate,
+ );
+ stats.n_bloom_num_bytes =
+ (bloom_filter.number_of_bits() as usize + 7) / 8;
+ GroupHistoryBloomFilter {
+ bloom_filter,
+ false_positive_rate: settings.false_positive_rate,
+ expected_num_items: settings.expected_num_items,
+ }
+ }),
+ },
+ };
+ GroupHistoryMemData { cache, stats }
+ }
+}
+
+impl GroupHistoryCache {
+ fn check_bloom_filter(&self, member_ser: &[u8]) -> bool {
+ if let Some(bloom) = &self.bloom {
+ bloom.bloom_filter.check(member_ser)
+ } else {
+ true
+ }
+ }
+
+ fn add_to_bloom_filter(&mut self, member_ser: &[u8]) {
+ if let Some(bloom) = &mut self.bloom {
+ bloom.bloom_filter.set(member_ser);
+ }
}
}
@@ -411,6 +706,12 @@
}
}
+impl std::fmt::Debug for GroupHistoryCache {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "GroupHistoryCache {{ .. }}")
+ }
+}
+
impl<'a, G: Group> GroupHistoryReader<'a, G> {
/// Create a new [`GroupHistoryReader`].
pub fn new(db: &'a Db) -> Result<Self> {
diff --git a/chronik/chronik-db/src/mem/data.rs b/chronik/chronik-db/src/mem/data.rs
--- a/chronik/chronik-db/src/mem/data.rs
+++ b/chronik/chronik-db/src/mem/data.rs
@@ -3,8 +3,9 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
use crate::io::{
- GroupHistoryMemData, GroupHistoryStats, GroupUtxoMemData, GroupUtxoStats,
- SpentByMemData, SpentByStats, TxsMemData, TxsStats,
+ GroupHistoryMemData, GroupHistorySettings, GroupHistoryStats,
+ GroupUtxoMemData, GroupUtxoStats, SpentByMemData, SpentByStats, TxsMemData,
+ TxsStats,
};
/// In-memory data for Chronik, e.g. caches, perf statistics.
@@ -35,14 +36,17 @@
/// Config for in-memory data for Chronik.
#[derive(Clone, Debug)]
-pub struct MemDataConf {}
+pub struct MemDataConf {
+ /// Settings for script history
+ pub script_history: GroupHistorySettings,
+}
impl MemData {
/// Create a new [`MemData`] from the given configuration.
- pub fn new(_: MemDataConf) -> Self {
+ pub fn new(conf: MemDataConf) -> Self {
MemData {
txs: TxsMemData::default(),
- script_history: GroupHistoryMemData::default(),
+ script_history: GroupHistoryMemData::new(conf.script_history),
script_utxos: GroupUtxoMemData::default(),
spent_by: SpentByMemData::default(),
}
diff --git a/chronik/chronik-db/src/test/value_group.rs b/chronik/chronik-db/src/test/value_group.rs
--- a/chronik/chronik-db/src/test/value_group.rs
+++ b/chronik/chronik-db/src/test/value_group.rs
@@ -65,6 +65,7 @@
GroupHistoryConf {
cf_page_name: "value_history",
cf_num_txs_name: "value_history_num_txs",
+ cf_cache_name: "value_history_cache",
page_size: 4,
}
}
diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs
--- a/chronik/chronik-indexer/src/indexer.rs
+++ b/chronik/chronik-indexer/src/indexer.rs
@@ -21,9 +21,9 @@
index_tx::prepare_indexed_txs,
io::{
merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter,
- BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData,
- MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry,
- TxReader, TxWriter,
+ BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData,
+ GroupHistorySettings, GroupUtxoMemData, MetadataReader, MetadataWriter,
+ SchemaVersion, SpentByWriter, TxEntry, TxReader, TxWriter,
},
mem::{MemData, MemDataConf, Mempool, MempoolTx},
};
@@ -42,7 +42,7 @@
subs_group::TxMsgType,
};
-const CURRENT_INDEXER_VERSION: SchemaVersion = 10;
+const CURRENT_INDEXER_VERSION: SchemaVersion = 11;
/// Params for setting up a [`ChronikIndexer`] instance.
#[derive(Clone)]
@@ -55,6 +55,8 @@
pub enable_token_index: bool,
/// Whether to output Chronik performance statistics into a perf/ folder
pub enable_perf_stats: bool,
+ /// Tune script history indexing
+ pub script_history: GroupHistorySettings,
}
/// Struct for indexing blocks and txs. Maintains db handles and mempool.
@@ -180,10 +182,16 @@
verify_schema_version(&db)?;
verify_enable_token_index(&db, params.enable_token_index)?;
let mempool = Mempool::new(ScriptGroup, params.enable_token_index);
+
+ let mut mem_data = MemData::new(MemDataConf {
+ script_history: params.script_history,
+ });
+ let script_history_writer = ScriptHistoryWriter::new(&db, ScriptGroup)?;
+ script_history_writer.init(&mut mem_data.script_history)?;
Ok(ChronikIndexer {
db,
mempool,
- mem_data: MemData::new(MemDataConf {}),
+ mem_data,
script_group: ScriptGroup,
avalanche: Avalanche::default(),
subs: RwLock::new(Subs::new(ScriptGroup)),
@@ -533,6 +541,14 @@
}
}
+ /// Shut down the indexer, i.e. store any pending mem data to disk.
+ pub fn shutdown(&mut self) -> Result<()> {
+ let script_history_writer =
+ ScriptHistoryWriter::new(&self.db, ScriptGroup)?;
+ script_history_writer.shutdown(&mut self.mem_data.script_history)?;
+ Ok(())
+ }
+
/// Return [`QueryBlocks`] to read blocks from the DB.
pub fn blocks(&self) -> QueryBlocks<'_> {
QueryBlocks {
@@ -757,6 +773,7 @@
wipe_db: false,
enable_token_index: false,
enable_perf_stats: false,
+ script_history: Default::default(),
};
// regtest folder doesn't exist yet -> error
assert_eq!(
@@ -825,6 +842,7 @@
wipe_db: false,
enable_token_index: false,
enable_perf_stats: false,
+ script_history: Default::default(),
};
// Setting up DB first time sets the schema version
diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs
--- a/chronik/chronik-lib/src/bridge.rs
+++ b/chronik/chronik-lib/src/bridge.rs
@@ -13,7 +13,7 @@
use abc_rust_error::Result;
use bitcoinsuite_core::tx::{Tx, TxId};
use chronik_bridge::{ffi::init_error, util::expect_unique_ptr};
-use chronik_db::mem::MempoolTx;
+use chronik_db::{io::GroupHistorySettings, mem::MempoolTx};
use chronik_http::server::{
ChronikServer, ChronikServerParams, ChronikSettings,
};
@@ -81,6 +81,11 @@
wipe_db: params.wipe_db,
enable_token_index: params.enable_token_index,
enable_perf_stats: params.enable_perf_stats,
+ script_history: GroupHistorySettings {
+ is_bloom_filter_enabled: params.script_history.is_bloom_enabled,
+ false_positive_rate: params.script_history.false_positive_rate,
+ expected_num_items: params.script_history.expected_num_items,
+ },
})?;
indexer.resync_indexer(bridge_ref)?;
if chronik_bridge::ffi::shutdown_requested() {
@@ -204,6 +209,13 @@
ok_or_abort_node("handle_block_finalized", self.finalize_block(bindex));
}
+ /// Shutdown Chronik gracefully
+ pub fn shutdown(&self) {
+ if let Err(err) = self.indexer.blocking_write().shutdown() {
+ log!("*** CHRONIK ERROR: Shutdown had an error: {err}\n");
+ }
+ }
+
fn add_tx_to_mempool(
&self,
ptx: &ffi::CTransaction,
diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs
--- a/chronik/chronik-lib/src/ffi.rs
+++ b/chronik/chronik-lib/src/ffi.rs
@@ -31,6 +31,19 @@
pub enable_perf_stats: bool,
/// Duration between WebSocket pings initiated by Chronik.
pub ws_ping_interval_secs: u64,
+ /// Settings for indexing script history
+ pub script_history: GroupHistoryParams,
+ }
+
+ /// Configure how to index group history
+ #[derive(Debug)]
+ pub struct GroupHistoryParams {
+ /// Whether to enable the bloom filter optimization
+ pub is_bloom_enabled: bool,
+ /// FP rate of the bloom filter
+ pub false_positive_rate: f64,
+ /// Expected number of items in the group
+ pub expected_num_items: usize,
}
extern "Rust" {
@@ -55,6 +68,7 @@
bindex: &CBlockIndex,
);
fn handle_block_finalized(&self, bindex: &CBlockIndex);
+ fn shutdown(&self);
}
unsafe extern "C++" {
diff --git a/src/init.cpp b/src/init.cpp
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -433,6 +433,7 @@
"-replayprotectionactivationtime",
"-enableminerfund",
"-chronikallowpause",
+ "-chronikscripthistoryexpectednumitems",
// GUI args. These will be overwritten by SetupUIArgs for the GUI
"-allowselfsignedrootcertificates",
"-choosedatadir",
@@ -654,6 +655,11 @@
"Output some performance statistics (e.g. num cache hits, "
"seconds spent) into a <datadir>/perf folder. (default: 0)",
ArgsManager::ALLOW_BOOL, OptionsCategory::CHRONIK);
+ argsman.AddArg("-chronikscripthistorybloomfilter",
+ strprintf("Whether to enable a bloom filter to speed up "
+ "script history (default: %u)",
+ chronik::DEFAULT_SCRIPT_HISTORY_BLOOM_IS_ENABLED),
+ ArgsManager::ALLOW_BOOL, OptionsCategory::CHRONIK);
#endif
argsman.AddArg(
"-blockfilterindex=<type>",
diff --git a/test/lint/check-doc.py b/test/lint/check-doc.py
--- a/test/lint/check-doc.py
+++ b/test/lint/check-doc.py
@@ -48,6 +48,7 @@
"-automaticunparking",
"-avalanchepreconsensus",
"-chronikallowpause",
+ "-chronikscripthistoryexpectednumitems",
"-dbcrashratio",
"-enableminerfund",
"-forcecompactdb",

File Metadata

Mime Type
text/plain
Expires
Sat, Apr 26, 10:43 (5 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5573322
Default Alt Text
D15465.id45297.diff (34 KB)

Event Timeline