Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F13711180
D15465.id45297.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
34 KB
Subscribers
None
D15465.id45297.diff
View Options
diff --git a/Cargo.lock b/Cargo.lock
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -173,6 +173,12 @@
"syn 2.0.29",
]
+[[package]]
+name = "bit-vec"
+version = "0.6.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb"
+
[[package]]
name = "bitcoinsuite-core"
version = "0.1.0"
@@ -222,6 +228,17 @@
"generic-array",
]
+[[package]]
+name = "bloomfilter"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b92db7965d438b8b4b1c1d0aedd188440a1084593c9eb7f6657e3df7e906d934"
+dependencies = [
+ "bit-vec",
+ "getrandom",
+ "siphasher",
+]
+
[[package]]
name = "byteorder"
version = "1.4.3"
@@ -292,6 +309,7 @@
"bimap",
"bitcoinsuite-core",
"bitcoinsuite-slp",
+ "bloomfilter",
"bytes",
"chronik-util",
"fastrand",
@@ -1623,6 +1641,12 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
+[[package]]
+name = "siphasher"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54ac45299ccbd390721be55b412d41931911f654fa99e2cb8bfb57184b2061fe"
+
[[package]]
name = "slab"
version = "0.4.9"
diff --git a/chronik/chronik-cpp/chronik.h b/chronik/chronik-cpp/chronik.h
--- a/chronik/chronik-cpp/chronik.h
+++ b/chronik/chronik-cpp/chronik.h
@@ -17,6 +17,11 @@
static const std::vector<std::string> DEFAULT_BINDS = {"127.0.0.1", "::1"};
+static const bool DEFAULT_SCRIPT_HISTORY_BLOOM_IS_ENABLED = true;
+static const double DEFAULT_SCRIPT_HISTORY_BLOOM_FALSE_POSITIVE_RATE = 0.90;
+static const size_t DEFAULT_SCRIPT_HISTORY_BLOOM_EXPECTED_NUM_ITEMS =
+ 100'000'000;
+
// Registers Chronik indexer as ValidationInterface, listens to HTTP queries
bool Start(const Config &config, const node::NodeContext &node, bool fWipe);
diff --git a/chronik/chronik-cpp/chronik.cpp b/chronik/chronik-cpp/chronik.cpp
--- a/chronik/chronik-cpp/chronik.cpp
+++ b/chronik/chronik-cpp/chronik.cpp
@@ -57,6 +57,17 @@
params.NetworkIDString() == CBaseChainParams::REGTEST
? uint64_t(count_seconds(WS_PING_INTERVAL_REGTEST))
: uint64_t(count_seconds(WS_PING_INTERVAL_DEFAULT)),
+ .script_history =
+ {
+ .is_bloom_enabled = gArgs.GetBoolArg(
+ "-chronikscripthistorybloomfilter",
+ DEFAULT_SCRIPT_HISTORY_BLOOM_IS_ENABLED),
+ .false_positive_rate =
+ DEFAULT_SCRIPT_HISTORY_BLOOM_FALSE_POSITIVE_RATE,
+ .expected_num_items = (size_t)gArgs.GetIntArg(
+ "-chronikscripthistoryexpectednumitems",
+ DEFAULT_SCRIPT_HISTORY_BLOOM_EXPECTED_NUM_ITEMS),
+ },
},
config, node);
}
diff --git a/chronik/chronik-cpp/chronik_validationinterface.cpp b/chronik/chronik-cpp/chronik_validationinterface.cpp
--- a/chronik/chronik-cpp/chronik_validationinterface.cpp
+++ b/chronik/chronik-cpp/chronik_validationinterface.cpp
@@ -23,7 +23,10 @@
void Register() { RegisterValidationInterface(this); }
- void Unregister() { UnregisterValidationInterface(this); }
+ void Shutdown() {
+ UnregisterValidationInterface(this);
+ m_chronik->shutdown();
+ }
private:
rust::Box<chronik_bridge::Chronik> m_chronik;
@@ -76,7 +79,7 @@
void StopChronikValidationInterface() {
if (g_chronik_validation_interface) {
- g_chronik_validation_interface->Unregister();
+ g_chronik_validation_interface->Shutdown();
// Reset so the Box is dropped and all handles are released.
g_chronik_validation_interface.reset();
}
diff --git a/chronik/chronik-db/Cargo.toml b/chronik/chronik-db/Cargo.toml
--- a/chronik/chronik-db/Cargo.toml
+++ b/chronik/chronik-db/Cargo.toml
@@ -22,6 +22,8 @@
# Efficient byte strings, with ref counted substrings
bytes = "1.4"
+bloomfilter = "1.0"
+
# En-/decode byte strings from/to hex
hex = "0.4"
diff --git a/chronik/chronik-db/src/db.rs b/chronik/chronik-db/src/db.rs
--- a/chronik/chronik-db/src/db.rs
+++ b/chronik/chronik-db/src/db.rs
@@ -45,12 +45,16 @@
pub const CF_SCRIPT_HISTORY: &str = "script_history";
/// Column family to store number of txs by script.
pub const CF_SCRIPT_HISTORY_NUM_TXS: &str = "script_history_num_txs";
+/// Column family to store cache data for the script history
+pub const CF_SCRIPT_HISTORY_CACHE: &str = "script_history_cache";
/// Column family for utxos by script.
pub const CF_SCRIPT_UTXO: &str = "script_utxo";
/// Column family to store tx history by token ID.
pub const CF_TOKEN_ID_HISTORY: &str = "token_id_history";
/// Column family to store number of txs by token ID.
pub const CF_TOKEN_ID_HISTORY_NUM_TXS: &str = "token_id_history_num_txs";
+/// Column family to store cache data for token ID history
+pub const CF_TOKEN_ID_HISTORY_CACHE: &str = "token_id_history_cache";
/// Column family for utxos by token ID.
pub const CF_TOKEN_ID_UTXO: &str = "token_id_utxo";
/// Column family to store which outputs have been spent by which tx inputs.
diff --git a/chronik/chronik-db/src/groups/script.rs b/chronik/chronik-db/src/groups/script.rs
--- a/chronik/chronik-db/src/groups/script.rs
+++ b/chronik/chronik-db/src/groups/script.rs
@@ -6,7 +6,10 @@
use bytes::Bytes;
use crate::{
- db::{CF_SCRIPT_HISTORY, CF_SCRIPT_HISTORY_NUM_TXS, CF_SCRIPT_UTXO},
+ db::{
+ CF_SCRIPT_HISTORY, CF_SCRIPT_HISTORY_CACHE, CF_SCRIPT_HISTORY_NUM_TXS,
+ CF_SCRIPT_UTXO,
+ },
group::{Group, GroupQuery, MemberItem, UtxoDataValue},
io::{
GroupHistoryConf, GroupHistoryReader, GroupHistoryWriter,
@@ -84,6 +87,7 @@
GroupHistoryConf {
cf_page_name: CF_SCRIPT_HISTORY,
cf_num_txs_name: CF_SCRIPT_HISTORY_NUM_TXS,
+ cf_cache_name: CF_SCRIPT_HISTORY_CACHE,
page_size: 1000,
}
}
diff --git a/chronik/chronik-db/src/groups/token_id.rs b/chronik/chronik-db/src/groups/token_id.rs
--- a/chronik/chronik-db/src/groups/token_id.rs
+++ b/chronik/chronik-db/src/groups/token_id.rs
@@ -10,7 +10,8 @@
use crate::{
db::{
- Db, CF_TOKEN_ID_HISTORY, CF_TOKEN_ID_HISTORY_NUM_TXS, CF_TOKEN_ID_UTXO,
+ Db, CF_TOKEN_ID_HISTORY, CF_TOKEN_ID_HISTORY_CACHE,
+ CF_TOKEN_ID_HISTORY_NUM_TXS, CF_TOKEN_ID_UTXO,
},
group::{Group, GroupQuery, MemberItem, UtxoDataOutput},
index_tx::IndexTx,
@@ -107,6 +108,7 @@
GroupHistoryConf {
cf_page_name: CF_TOKEN_ID_HISTORY,
cf_num_txs_name: CF_TOKEN_ID_HISTORY_NUM_TXS,
+ cf_cache_name: CF_TOKEN_ID_HISTORY_CACHE,
page_size: 1000,
}
}
diff --git a/chronik/chronik-db/src/io/group_history.rs b/chronik/chronik-db/src/io/group_history.rs
--- a/chronik/chronik-db/src/io/group_history.rs
+++ b/chronik/chronik-db/src/io/group_history.rs
@@ -5,7 +5,9 @@
use std::{collections::BTreeMap, marker::PhantomData, time::Instant};
use abc_rust_error::Result;
+use chronik_util::{log, log_chronik};
use rocksdb::WriteBatch;
+use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::{
@@ -13,9 +15,10 @@
group::{tx_members_for_group, Group, GroupQuery},
index_tx::IndexTx,
io::{
- group_history::GroupHistoryError::*, merge::catch_merge_errors, TxNum,
+ group_history::GroupHistoryError::*, merge::catch_merge_errors,
+ BlockHeight, BlockReader, TxNum,
},
- ser::{db_deserialize_vec, db_serialize_vec},
+ ser::{db_deserialize, db_deserialize_vec, db_serialize, db_serialize_vec},
};
/// Represent page numbers with 32-bit unsigned integers.
@@ -27,6 +30,8 @@
const CONCAT: u8 = b'C';
const TRIM: u8 = b'T';
+const KEY_CACHE_BLOOM: &[u8] = b"bloom";
+
/// Configuration for group history reader/writers.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct GroupHistoryConf {
@@ -34,6 +39,9 @@
pub cf_page_name: &'static str,
/// Column family to store the last page num of the group history.
pub cf_num_txs_name: &'static str,
+ /// Column family to store serialized cache data (e.g. bloom filter) of the
+ /// group history.
+ pub cf_cache_name: &'static str,
/// Page size for each member of the group.
pub page_size: NumTxs,
}
@@ -42,6 +50,7 @@
db: &'a Db,
cf_page: &'a CF,
cf_num_txs: &'a CF,
+ cf_cache: &'a CF,
}
/// Write txs grouped and paginated to the DB.
@@ -61,6 +70,19 @@
/// also between pages, such that the entire tx history of a member can be
/// iterated by going through pages 0..N and going through all of the txs of
/// each page.
+///
+/// Optionally, there is also a bloom filter speeding up indexing significantly,
+/// which let's us know if a member is definitely not in the DB, and we can
+/// assume the number of txs for this member is 0. This is a very common case
+/// (most scripts are used only a few times, and one of them must be the first),
+/// and in this case it allows us to skip reading from the DB completely and
+/// just issue a merge_cf call right away.
+///
+/// We store the bloom filter at shutdown and load it at init. We have to make
+/// 100% sure the bloom filter is valid and up-to-date with the rest of the DB,
+/// because otherwise we don't have the guarantee that a script isn't in the DB
+/// if it's not in the bloom filter. If that were to fail, we could insert the
+/// history entry on the wrong page, causing all sorts of problems.
#[derive(Debug)]
pub struct GroupHistoryWriter<'a, G: Group> {
col: GroupHistoryColumn<'a>,
@@ -76,11 +98,36 @@
phantom: PhantomData<G>,
}
+/// Settings for group history, e.g. whether to use a bloom filter or LRU cache.
+#[derive(Clone, Debug, Default)]
+pub struct GroupHistorySettings {
+ /// Whether to use a bloom filter to determine if a member has any history
+ pub is_bloom_filter_enabled: bool,
+ /// Bloom filter false positive rate
+ pub false_positive_rate: f64,
+ /// Expected number of total distinct members of the group
+ pub expected_num_items: usize,
+}
+
/// In-memory data for the tx history.
#[derive(Debug, Default)]
pub struct GroupHistoryMemData {
/// Stats about cache hits, num requests etc.
pub stats: GroupHistoryStats,
+ /// In-memory data to speed up indexing, e.g. LRU caches or bloom filters
+ pub cache: GroupHistoryCache,
+}
+
+/// In-memory data to speed up indexing, e.g. LRU caches or bloom filters
+#[derive(Default)]
+pub struct GroupHistoryCache {
+ bloom: Option<GroupHistoryBloomFilter>,
+}
+
+struct GroupHistoryBloomFilter {
+ bloom_filter: bloomfilter::Bloom<[u8]>,
+ false_positive_rate: f64,
+ expected_num_items: usize,
}
/// Stats about cache hits, num requests etc.
@@ -88,18 +135,30 @@
pub struct GroupHistoryStats {
/// Total number of members updated.
pub n_total: usize,
+ /// Num of total hits of the bloom filter
+ pub n_bloom_hits: usize,
+ /// Num of hits that turned out to be false positives
+ pub n_bloom_false_positives: usize,
+ /// Size of the bloom filter data in bytes
+ pub n_bloom_num_bytes: usize,
+ /// Number of entries fetched from the DB
+ pub n_fetched: usize,
/// Time [s] for insert/delete.
pub t_total: f64,
/// Time [s] for grouping txs.
pub t_group: f64,
/// Time [s] for serializing members.
pub t_ser_members: f64,
+ /// Time [s] for checking the bloom filter.
+ pub t_bloom: f64,
/// Time [s] for fetching existing tx data.
pub t_fetch: f64,
+ /// Time [s] for counting the set bits in the bloom filter at startup.
+ pub t_init_num_bits: f64,
}
/// Error indicating that something went wrong with writing group history data.
-#[derive(Debug, Error, PartialEq, Eq)]
+#[derive(Debug, Error, PartialEq)]
pub enum GroupHistoryError {
/// Bad num_txs size
#[error("Inconsistent DB: Bad num_txs size: {0:?}")]
@@ -111,14 +170,60 @@
hex::encode(.1),
)]
UnknownOperandPrefix(u8, Vec<u8>),
+
+ /// Mismached bloom filter settings: configured <> DB
+ #[error(
+ "Mismached bloom filter settings: configured fpr \
+ {configured_fp_rate}, DB has {db_fp_rate}; configured expected N \
+ {configured_expected_n}, DB has {db_fp_rate}"
+ )]
+ MismatchedBloomFilterSettings {
+ /// Configured FP rate
+ configured_fp_rate: f64,
+ /// FP rate in the DB
+ db_fp_rate: f64,
+ /// Configured expected N
+ configured_expected_n: usize,
+ /// Expected N in the DB
+ db_expected_n: usize,
+ },
+
+ /// Mismatched bloom filter block height: DB <> bloom filter
+ #[error(
+ "Inconsistent group history bloom filter block height: DB is at \
+ {db_height} but bloom filter is for {bloom_height}, consider wiping \
+ the bloom filter or -chronikreindex to restore"
+ )]
+ MismatchedBloomFilterHeight {
+ /// Block height of the DB (BlockWriter)
+ db_height: BlockHeight,
+ /// Block height in the existing bloom filter
+ bloom_height: BlockHeight,
+ },
+}
+
+enum BloomResult {
+ Hit,
+ NoHit,
}
struct FetchedNumTxs<'tx, G: Group> {
- members_num_txs: Vec<NumTxs>,
+ members_num_txs: Vec<(NumTxs, BloomResult)>,
grouped_txs: BTreeMap<G::Member<'tx>, Vec<TxNum>>,
ser_members: Vec<G::MemberSer<'tx>>,
}
+#[derive(Serialize, Deserialize)]
+struct SerBloomFilter {
+ block_height: BlockHeight,
+ bytes: Vec<u8>,
+ bitmap_bits: u64,
+ k_num: u32,
+ sip_keys: [(u64, u64); 2],
+ false_positive_rate: f64,
+ expected_num_items: usize,
+}
+
pub(crate) fn bytes_to_num_txs(bytes: &[u8]) -> Result<NumTxs> {
Ok(NumTxs::from_be_bytes(
bytes
@@ -174,10 +279,12 @@
fn new(db: &'a Db, conf: &GroupHistoryConf) -> Result<Self> {
let cf_page = db.cf(conf.cf_page_name)?;
let cf_num_txs = db.cf(conf.cf_num_txs_name)?;
+ let cf_cache = db.cf(conf.cf_cache_name)?;
Ok(GroupHistoryColumn {
db,
cf_page,
cf_num_txs,
+ cf_cache,
})
}
@@ -203,6 +310,107 @@
Ok(GroupHistoryWriter { col, conf, group })
}
+ /// Load cache data from the DB to `mem_data` at startup.
+ /// For the bloom filter, this is important to get right otherwise we will
+ /// get false negatives and a garbled history.
+ pub fn init(&self, mem_data: &mut GroupHistoryMemData) -> Result<()> {
+ let block_height = BlockReader::new(self.col.db)?.height()?;
+ if let Some(ser_bloom_filter) =
+ self.col.db.get(self.col.cf_cache, KEY_CACHE_BLOOM)?
+ {
+ let bloom_filter =
+ db_deserialize::<SerBloomFilter>(&ser_bloom_filter)?;
+ let t_num_bits = Instant::now();
+ let num_bits = bloom_filter
+ .bytes
+ .iter()
+ .map(|byte| byte.count_ones())
+ .sum::<u32>();
+ mem_data.stats.t_init_num_bits = t_num_bits.elapsed().as_secs_f64();
+ if let Some(configured) = &mem_data.cache.bloom {
+ if configured.bloom_filter.number_of_bits()
+ != bloom_filter.bitmap_bits
+ || configured.bloom_filter.number_of_hash_functions()
+ != bloom_filter.k_num
+ {
+ return Err(MismatchedBloomFilterSettings {
+ configured_fp_rate: configured.false_positive_rate,
+ db_fp_rate: bloom_filter.false_positive_rate,
+ configured_expected_n: configured.expected_num_items,
+ db_expected_n: bloom_filter.expected_num_items,
+ }
+ .into());
+ }
+ if bloom_filter.block_height != block_height {
+ return Err(MismatchedBloomFilterHeight {
+ db_height: block_height,
+ bloom_height: bloom_filter.block_height,
+ }
+ .into());
+ }
+ log_chronik!(
+ "Loaded bloom filter for {:?}, with {num_bits} bits set\n",
+ self.conf.cf_page_name,
+ );
+ mem_data.cache.bloom = Some(GroupHistoryBloomFilter {
+ bloom_filter: bloomfilter::Bloom::from_existing(
+ &bloom_filter.bytes,
+ bloom_filter.bitmap_bits,
+ bloom_filter.k_num,
+ bloom_filter.sip_keys,
+ ),
+ false_positive_rate: configured.false_positive_rate,
+ expected_num_items: configured.expected_num_items,
+ });
+ } else {
+ log!(
+ "Chronik: Ignoring existing bloom filter in DB (for block \
+ height {}, {} available bits, {} bits set, {:.1}% fpr, \
+ {} expected num items); will be wiped at shutdown\n",
+ bloom_filter.block_height,
+ bloom_filter.bitmap_bits,
+ num_bits,
+ bloom_filter.false_positive_rate * 100.0,
+ bloom_filter.expected_num_items,
+ );
+ }
+ }
+ Ok(())
+ }
+
+ /// Write cache data to the DB data at shutdown so we can restore it later.
+ pub fn shutdown(&self, mem_data: &mut GroupHistoryMemData) -> Result<()> {
+ let block_height = BlockReader::new(self.col.db)?.height()?;
+ let mut batch = WriteBatch::default();
+ if let Some(bloom) = &mem_data.cache.bloom {
+ let bloom_filter = SerBloomFilter {
+ block_height,
+ bytes: bloom.bloom_filter.bitmap(),
+ bitmap_bits: bloom.bloom_filter.number_of_bits(),
+ k_num: bloom.bloom_filter.number_of_hash_functions(),
+ sip_keys: bloom.bloom_filter.sip_keys(),
+ false_positive_rate: bloom.false_positive_rate,
+ expected_num_items: bloom.expected_num_items,
+ };
+ let ser_bloom_filter = db_serialize(&bloom_filter)?;
+
+ batch.put_cf(self.col.cf_cache, KEY_CACHE_BLOOM, &ser_bloom_filter);
+ } else {
+ let bloom = self.col.db.get(self.col.cf_cache, KEY_CACHE_BLOOM)?;
+ if bloom.is_some() {
+ log!(
+ "Chronik: Deleting existing bloom filter for {:?}\n",
+ self.conf.cf_page_name,
+ );
+ }
+ // Important: delete bloom filter from DB if unset to prevent false
+ // negatives
+ batch.delete_cf(self.col.cf_cache, KEY_CACHE_BLOOM);
+ }
+ self.col.db.write_batch(batch)?;
+ Ok(())
+ }
+
/// Group the txs, then insert them to into each member of the group.
pub fn insert(
&self,
@@ -213,11 +421,12 @@
) -> Result<()> {
let t_start = Instant::now();
let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?;
- for ((mut new_tx_nums, member_ser), mut num_txs) in fetched
- .grouped_txs
- .into_values()
- .zip(fetched.ser_members)
- .zip(fetched.members_num_txs)
+ for ((mut new_tx_nums, member_ser), (mut num_txs, bloom_result)) in
+ fetched
+ .grouped_txs
+ .into_values()
+ .zip(fetched.ser_members)
+ .zip(fetched.members_num_txs)
{
let mut page_num = num_txs / self.conf.page_size;
let mut last_page_num_txs = num_txs % self.conf.page_size;
@@ -239,6 +448,9 @@
member_ser.as_ref(),
num_txs.to_be_bytes(),
);
+ if matches!(bloom_result, BloomResult::NoHit) {
+ mem_data.cache.add_to_bloom_filter(member_ser.as_ref());
+ }
break;
}
last_page_num_txs = 0;
@@ -258,8 +470,18 @@
mem_data: &mut GroupHistoryMemData,
) -> Result<()> {
let t_start = Instant::now();
+
+ // On reorg, wipe bloom filter. This is because we cannot ever remove
+ // elements from a (normal) bloom filter. In practice, the main
+ // benefit for the bloom filter is during IBD, where no reorgs occur.
+ // If tx volume picks up and reorgs wiping the bloom filter becomes a
+ // problem, we can introduce checkpointing such that recent
+ // bloom filters can be recovered. With the addition of Avalanche, the
+ // deepest reorg depth should only ever be 1.
+ mem_data.cache.bloom = None;
+
let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?;
- for ((mut removed_tx_nums, member_ser), mut num_txs) in fetched
+ for ((mut removed_tx_nums, member_ser), (mut num_txs, _)) in fetched
.grouped_txs
.into_values()
.zip(fetched.ser_members)
@@ -320,7 +542,7 @@
aux: &G::Aux,
mem_data: &mut GroupHistoryMemData,
) -> Result<FetchedNumTxs<'tx, G>> {
- let GroupHistoryMemData { stats } = mem_data;
+ let GroupHistoryMemData { stats, cache } = mem_data;
let t_group = Instant::now();
let grouped_txs = self.group_txs(txs, aux);
stats.t_group += t_group.elapsed().as_secs_f64();
@@ -334,19 +556,45 @@
stats.n_total += grouped_txs.len();
+ let t_bloom = Instant::now();
+ let mut members_num_txs = Vec::with_capacity(ser_members.len());
+ for member_ser in &ser_members {
+ if cache.check_bloom_filter(member_ser.as_ref()) {
+ stats.n_bloom_hits += 1;
+ members_num_txs.push((0, BloomResult::Hit));
+ } else {
+ members_num_txs.push((0, BloomResult::NoHit));
+ }
+ }
+ stats.t_bloom += t_bloom.elapsed().as_secs_f64();
+
let t_fetch = Instant::now();
- let num_txs_keys =
- ser_members.iter().map(|member_ser| member_ser.as_ref());
+ let num_txs_keys = ser_members.iter().zip(&members_num_txs).filter_map(
+ |(member_ser, (_, bloom_result))| match bloom_result {
+ BloomResult::Hit => Some(member_ser.as_ref()),
+ BloomResult::NoHit => None,
+ },
+ );
let fetched_num_txs =
self.col
.db
.multi_get(self.col.cf_num_txs, num_txs_keys, true)?;
- let mut members_num_txs = Vec::with_capacity(fetched_num_txs.len());
- for db_num_txs in fetched_num_txs {
- members_num_txs.push(match db_num_txs {
- Some(db_num_txs) => bytes_to_num_txs(&db_num_txs)?,
- None => 0,
- });
+ stats.n_fetched += fetched_num_txs.len();
+ for ((member_num_txs, _), db_num_txs) in members_num_txs
+ .iter_mut()
+ .filter(|(_, bloom_result)| {
+ matches!(bloom_result, BloomResult::Hit)
+ })
+ .zip(fetched_num_txs)
+ {
+ match db_num_txs {
+ Some(db_num_txs) => {
+ *member_num_txs = bytes_to_num_txs(&db_num_txs)?;
+ }
+ None => {
+ stats.n_bloom_false_positives += 1;
+ }
+ }
}
stats.t_fetch += t_fetch.elapsed().as_secs_f64();
@@ -402,6 +650,53 @@
conf.cf_num_txs_name,
rocksdb::Options::default(),
));
+ columns.push(rocksdb::ColumnFamilyDescriptor::new(
+ conf.cf_cache_name,
+ rocksdb::Options::default(),
+ ));
+ }
+}
+
+impl GroupHistoryMemData {
+ /// Create a new [`GroupHistoryMemData`] using the given
+ /// [`GroupHistorySettings`].
+ pub fn new(settings: GroupHistorySettings) -> Self {
+ let mut stats = GroupHistoryStats::default();
+ let cache = match settings.is_bloom_filter_enabled {
+ false => GroupHistoryCache { bloom: None },
+ true => GroupHistoryCache {
+ bloom: Some({
+ let bloom_filter = bloomfilter::Bloom::new_for_fp_rate(
+ settings.expected_num_items,
+ settings.false_positive_rate,
+ );
+ stats.n_bloom_num_bytes =
+ (bloom_filter.number_of_bits() as usize + 7) / 8;
+ GroupHistoryBloomFilter {
+ bloom_filter,
+ false_positive_rate: settings.false_positive_rate,
+ expected_num_items: settings.expected_num_items,
+ }
+ }),
+ },
+ };
+ GroupHistoryMemData { cache, stats }
+ }
+}
+
+impl GroupHistoryCache {
+ fn check_bloom_filter(&self, member_ser: &[u8]) -> bool {
+ if let Some(bloom) = &self.bloom {
+ bloom.bloom_filter.check(member_ser)
+ } else {
+ true
+ }
+ }
+
+ fn add_to_bloom_filter(&mut self, member_ser: &[u8]) {
+ if let Some(bloom) = &mut self.bloom {
+ bloom.bloom_filter.set(member_ser);
+ }
}
}
@@ -411,6 +706,12 @@
}
}
+impl std::fmt::Debug for GroupHistoryCache {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "GroupHistoryCache {{ .. }}")
+ }
+}
+
impl<'a, G: Group> GroupHistoryReader<'a, G> {
/// Create a new [`GroupHistoryReader`].
pub fn new(db: &'a Db) -> Result<Self> {
diff --git a/chronik/chronik-db/src/mem/data.rs b/chronik/chronik-db/src/mem/data.rs
--- a/chronik/chronik-db/src/mem/data.rs
+++ b/chronik/chronik-db/src/mem/data.rs
@@ -3,8 +3,9 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
use crate::io::{
- GroupHistoryMemData, GroupHistoryStats, GroupUtxoMemData, GroupUtxoStats,
- SpentByMemData, SpentByStats, TxsMemData, TxsStats,
+ GroupHistoryMemData, GroupHistorySettings, GroupHistoryStats,
+ GroupUtxoMemData, GroupUtxoStats, SpentByMemData, SpentByStats, TxsMemData,
+ TxsStats,
};
/// In-memory data for Chronik, e.g. caches, perf statistics.
@@ -35,14 +36,17 @@
/// Config for in-memory data for Chronik.
#[derive(Clone, Debug)]
-pub struct MemDataConf {}
+pub struct MemDataConf {
+ /// Settings for script history
+ pub script_history: GroupHistorySettings,
+}
impl MemData {
/// Create a new [`MemData`] from the given configuration.
- pub fn new(_: MemDataConf) -> Self {
+ pub fn new(conf: MemDataConf) -> Self {
MemData {
txs: TxsMemData::default(),
- script_history: GroupHistoryMemData::default(),
+ script_history: GroupHistoryMemData::new(conf.script_history),
script_utxos: GroupUtxoMemData::default(),
spent_by: SpentByMemData::default(),
}
diff --git a/chronik/chronik-db/src/test/value_group.rs b/chronik/chronik-db/src/test/value_group.rs
--- a/chronik/chronik-db/src/test/value_group.rs
+++ b/chronik/chronik-db/src/test/value_group.rs
@@ -65,6 +65,7 @@
GroupHistoryConf {
cf_page_name: "value_history",
cf_num_txs_name: "value_history_num_txs",
+ cf_cache_name: "value_history_cache",
page_size: 4,
}
}
diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs
--- a/chronik/chronik-indexer/src/indexer.rs
+++ b/chronik/chronik-indexer/src/indexer.rs
@@ -21,9 +21,9 @@
index_tx::prepare_indexed_txs,
io::{
merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter,
- BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData,
- MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry,
- TxReader, TxWriter,
+ BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData,
+ GroupHistorySettings, GroupUtxoMemData, MetadataReader, MetadataWriter,
+ SchemaVersion, SpentByWriter, TxEntry, TxReader, TxWriter,
},
mem::{MemData, MemDataConf, Mempool, MempoolTx},
};
@@ -42,7 +42,7 @@
subs_group::TxMsgType,
};
-const CURRENT_INDEXER_VERSION: SchemaVersion = 10;
+const CURRENT_INDEXER_VERSION: SchemaVersion = 11;
/// Params for setting up a [`ChronikIndexer`] instance.
#[derive(Clone)]
@@ -55,6 +55,8 @@
pub enable_token_index: bool,
/// Whether to output Chronik performance statistics into a perf/ folder
pub enable_perf_stats: bool,
+ /// Tune script history indexing
+ pub script_history: GroupHistorySettings,
}
/// Struct for indexing blocks and txs. Maintains db handles and mempool.
@@ -180,10 +182,16 @@
verify_schema_version(&db)?;
verify_enable_token_index(&db, params.enable_token_index)?;
let mempool = Mempool::new(ScriptGroup, params.enable_token_index);
+
+ let mut mem_data = MemData::new(MemDataConf {
+ script_history: params.script_history,
+ });
+ let script_history_writer = ScriptHistoryWriter::new(&db, ScriptGroup)?;
+ script_history_writer.init(&mut mem_data.script_history)?;
Ok(ChronikIndexer {
db,
mempool,
- mem_data: MemData::new(MemDataConf {}),
+ mem_data,
script_group: ScriptGroup,
avalanche: Avalanche::default(),
subs: RwLock::new(Subs::new(ScriptGroup)),
@@ -533,6 +541,14 @@
}
}
+ /// Shut down the indexer, i.e. store any pending mem data to disk.
+ pub fn shutdown(&mut self) -> Result<()> {
+ let script_history_writer =
+ ScriptHistoryWriter::new(&self.db, ScriptGroup)?;
+ script_history_writer.shutdown(&mut self.mem_data.script_history)?;
+ Ok(())
+ }
+
/// Return [`QueryBlocks`] to read blocks from the DB.
pub fn blocks(&self) -> QueryBlocks<'_> {
QueryBlocks {
@@ -757,6 +773,7 @@
wipe_db: false,
enable_token_index: false,
enable_perf_stats: false,
+ script_history: Default::default(),
};
// regtest folder doesn't exist yet -> error
assert_eq!(
@@ -825,6 +842,7 @@
wipe_db: false,
enable_token_index: false,
enable_perf_stats: false,
+ script_history: Default::default(),
};
// Setting up DB first time sets the schema version
diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs
--- a/chronik/chronik-lib/src/bridge.rs
+++ b/chronik/chronik-lib/src/bridge.rs
@@ -13,7 +13,7 @@
use abc_rust_error::Result;
use bitcoinsuite_core::tx::{Tx, TxId};
use chronik_bridge::{ffi::init_error, util::expect_unique_ptr};
-use chronik_db::mem::MempoolTx;
+use chronik_db::{io::GroupHistorySettings, mem::MempoolTx};
use chronik_http::server::{
ChronikServer, ChronikServerParams, ChronikSettings,
};
@@ -81,6 +81,11 @@
wipe_db: params.wipe_db,
enable_token_index: params.enable_token_index,
enable_perf_stats: params.enable_perf_stats,
+ script_history: GroupHistorySettings {
+ is_bloom_filter_enabled: params.script_history.is_bloom_enabled,
+ false_positive_rate: params.script_history.false_positive_rate,
+ expected_num_items: params.script_history.expected_num_items,
+ },
})?;
indexer.resync_indexer(bridge_ref)?;
if chronik_bridge::ffi::shutdown_requested() {
@@ -204,6 +209,13 @@
ok_or_abort_node("handle_block_finalized", self.finalize_block(bindex));
}
+ /// Shutdown Chronik gracefully
+ pub fn shutdown(&self) {
+ if let Err(err) = self.indexer.blocking_write().shutdown() {
+ log!("*** CHRONIK ERROR: Shutdown had an error: {err}\n");
+ }
+ }
+
fn add_tx_to_mempool(
&self,
ptx: &ffi::CTransaction,
diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs
--- a/chronik/chronik-lib/src/ffi.rs
+++ b/chronik/chronik-lib/src/ffi.rs
@@ -31,6 +31,19 @@
pub enable_perf_stats: bool,
/// Duration between WebSocket pings initiated by Chronik.
pub ws_ping_interval_secs: u64,
+ /// Settings for indexing script history
+ pub script_history: GroupHistoryParams,
+ }
+
+ /// Configure how to index group history
+ #[derive(Debug)]
+ pub struct GroupHistoryParams {
+ /// Whether to enable the bloom filter optimization
+ pub is_bloom_enabled: bool,
+ /// FP rate of the bloom filter
+ pub false_positive_rate: f64,
+ /// Expected number of items in the group
+ pub expected_num_items: usize,
}
extern "Rust" {
@@ -55,6 +68,7 @@
bindex: &CBlockIndex,
);
fn handle_block_finalized(&self, bindex: &CBlockIndex);
+ fn shutdown(&self);
}
unsafe extern "C++" {
diff --git a/src/init.cpp b/src/init.cpp
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -433,6 +433,7 @@
"-replayprotectionactivationtime",
"-enableminerfund",
"-chronikallowpause",
+ "-chronikscripthistoryexpectednumitems",
// GUI args. These will be overwritten by SetupUIArgs for the GUI
"-allowselfsignedrootcertificates",
"-choosedatadir",
@@ -654,6 +655,11 @@
"Output some performance statistics (e.g. num cache hits, "
"seconds spent) into a <datadir>/perf folder. (default: 0)",
ArgsManager::ALLOW_BOOL, OptionsCategory::CHRONIK);
+ argsman.AddArg("-chronikscripthistorybloomfilter",
+ strprintf("Whether to enable a bloom filter to speed up "
+ "script history (default: %u)",
+ chronik::DEFAULT_SCRIPT_HISTORY_BLOOM_IS_ENABLED),
+ ArgsManager::ALLOW_BOOL, OptionsCategory::CHRONIK);
#endif
argsman.AddArg(
"-blockfilterindex=<type>",
diff --git a/test/lint/check-doc.py b/test/lint/check-doc.py
--- a/test/lint/check-doc.py
+++ b/test/lint/check-doc.py
@@ -48,6 +48,7 @@
"-automaticunparking",
"-avalanchepreconsensus",
"-chronikallowpause",
+ "-chronikscripthistoryexpectednumitems",
"-dbcrashratio",
"-enableminerfund",
"-forcecompactdb",
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Apr 26, 10:43 (5 h, 35 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5573322
Default Alt Text
D15465.id45297.diff (34 KB)
Attached To
D15465: [Chronik] Add an in-memory bloom filter to GroupHistoryWriter
Event Timeline
Log In to Comment