Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14864571
D13337.id38596.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
D13337.id38596.diff
View Options
diff --git a/chronik/chronik-db/src/db.rs b/chronik/chronik-db/src/db.rs
--- a/chronik/chronik-db/src/db.rs
+++ b/chronik/chronik-db/src/db.rs
@@ -13,10 +13,13 @@
use rocksdb::{ColumnFamilyDescriptor, IteratorMode};
use thiserror::Error;
-use crate::io::BlockWriter;
+use crate::io::{BlockWriter, MetadataWriter};
// All column family names used by Chronik should be defined here
-pub(crate) const CF_BLK: &str = "blk";
+/// Column family name for the block data.
+pub const CF_BLK: &str = "blk";
+/// Column family name for db metadata.
+pub const CF_META: &str = "meta";
pub(crate) type CF = rocksdb::ColumnFamily;
@@ -25,6 +28,7 @@
#[derive(Debug)]
pub struct Db {
db: rocksdb::DB,
+ cf_names: Vec<String>,
}
/// Errors indicating something went wrong with the database itself.
@@ -47,6 +51,7 @@
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let mut cfs = Vec::new();
BlockWriter::add_cfs(&mut cfs);
+ MetadataWriter::add_cfs(&mut cfs);
Self::open_with_cfs(path, cfs)
}
@@ -55,9 +60,10 @@
cfs: Vec<ColumnFamilyDescriptor>,
) -> Result<Self> {
let db_options = Self::db_options();
+ let cf_names = cfs.iter().map(|cf| cf.name().to_string()).collect();
let db = rocksdb::DB::open_cf_descriptors(&db_options, path, cfs)
.map_err(RocksDb)?;
- Ok(Db { db })
+ Ok(Db { db, cf_names })
}
fn db_options() -> rocksdb::Options {
@@ -81,7 +87,8 @@
Ok(())
}
- pub(crate) fn cf(&self, name: &str) -> Result<&CF> {
+ /// Return a column family handle with the given name.
+ pub fn cf(&self, name: &str) -> Result<&CF> {
Ok(self
.db
.cf_handle(name)
@@ -110,4 +117,52 @@
self.db.write(write_batch).map_err(RocksDb)?;
Ok(())
}
+
+ /// Whether any of the column families in the DB have any data.
+ ///
+ /// Note: RocksDB forbids not opening all column families, therefore, this
+ /// will always iter through all column families.
+ pub fn is_db_empty(&self) -> Result<bool> {
+ for cf_name in &self.cf_names {
+ let cf = self.cf(cf_name)?;
+ let mut cf_iter = self.db.full_iterator_cf(cf, IteratorMode::Start);
+ let val = cf_iter.next();
+ println!("cf {} val: {:?}", cf_name, val);
+ if val.is_some() {
+ return Ok(false);
+ }
+ }
+ Ok(true)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use abc_rust_error::Result;
+ use rocksdb::ColumnFamilyDescriptor;
+
+ use crate::db::{Db, DbError};
+
+ #[test]
+ fn test_must_specify_all_cfs() -> Result<()> {
+ // Ensure rocksdb requires us to always open all CFs
+ let tempdir = tempdir::TempDir::new("chronik-db--specify-all-cfs")?;
+ let opts = rocksdb::Options::default();
+ let cfs = vec![
+ ColumnFamilyDescriptor::new("cf1", opts.clone()),
+ ColumnFamilyDescriptor::new("cf2", opts.clone()),
+ ColumnFamilyDescriptor::new("cf3", opts.clone()),
+ ];
+ Db::open_with_cfs(tempdir.path(), cfs)?;
+ let cfs = vec![ColumnFamilyDescriptor::new("cf1", opts)];
+ assert_eq!(
+ Db::open_with_cfs(tempdir.path(), cfs)
+ .unwrap_err()
+ .downcast::<DbError>()?
+ .to_string(),
+ "RocksDB error: Invalid argument: Column families not opened: \
+ cf3, cf2"
+ );
+ Ok(())
+ }
}
diff --git a/chronik/chronik-db/src/io/metadata.rs b/chronik/chronik-db/src/io/metadata.rs
new file mode 100644
--- /dev/null
+++ b/chronik/chronik-db/src/io/metadata.rs
@@ -0,0 +1,87 @@
+// Copyright (c) 2023 The Bitcoin developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+use abc_rust_error::Result;
+use rocksdb::ColumnFamilyDescriptor;
+
+use crate::{
+ db::{Db, CF, CF_META},
+ ser::{db_deserialize, db_serialize},
+};
+
+/// Type for the version of the database, to let us know when we're outdated.
+pub type SchemaVersion = u64;
+
+/// Field in the `meta` cf storing the schema version.
+pub const FIELD_SCHEMA_VERSION: &[u8] = b"SCHEMA_VERSION";
+
+/// Write database metadata
+pub struct MetadataWriter<'a> {
+ cf: &'a CF,
+}
+
+/// Read database metadata
+pub struct MetadataReader<'a> {
+ db: &'a Db,
+ cf: &'a CF,
+}
+
+impl<'a> MetadataWriter<'a> {
+ /// Create a writer to the database for metadata
+ pub fn new(db: &'a Db) -> Result<Self> {
+ let cf = db.cf(CF_META)?;
+ Ok(MetadataWriter { cf })
+ }
+
+ /// Update the schema version of the database
+ pub fn update_schema_version(
+ &self,
+ batch: &mut rocksdb::WriteBatch,
+ schema_version: SchemaVersion,
+ ) -> Result<()> {
+ batch.put_cf(
+ self.cf,
+ FIELD_SCHEMA_VERSION,
+ db_serialize(&schema_version)?,
+ );
+ Ok(())
+ }
+
+ pub(crate) fn add_cfs(columns: &mut Vec<ColumnFamilyDescriptor>) {
+ columns.push(ColumnFamilyDescriptor::new(
+ CF_META,
+ rocksdb::Options::default(),
+ ));
+ }
+}
+
+impl std::fmt::Debug for MetadataWriter<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "MetadataWriter {{ ... }}")
+ }
+}
+
+impl<'a> MetadataReader<'a> {
+ /// Create a reader from the database for metadata
+ pub fn new(db: &'a Db) -> Result<Self> {
+ let cf = db.cf(CF_META)?;
+ Ok(MetadataReader { db, cf })
+ }
+
+ /// Read the schema version of the database
+ pub fn schema_version(&self) -> Result<Option<SchemaVersion>> {
+ match self.db.get(self.cf, FIELD_SCHEMA_VERSION)? {
+ Some(ser_schema_version) => {
+ Ok(Some(db_deserialize(&ser_schema_version)?))
+ }
+ None => Ok(None),
+ }
+ }
+}
+
+impl std::fmt::Debug for MetadataReader<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "MetadataReader {{ ... }}")
+ }
+}
diff --git a/chronik/chronik-db/src/io/mod.rs b/chronik/chronik-db/src/io/mod.rs
--- a/chronik/chronik-db/src/io/mod.rs
+++ b/chronik/chronik-db/src/io/mod.rs
@@ -5,5 +5,7 @@
//! Module containing readers and writers for the database used by Chronik.
mod blocks;
+mod metadata;
pub use self::blocks::*;
+pub use self::metadata::*;
diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs
--- a/chronik/chronik-indexer/src/indexer.rs
+++ b/chronik/chronik-indexer/src/indexer.rs
@@ -11,11 +11,16 @@
use chronik_bridge::{ffi, util::expect_unique_ptr};
use chronik_db::{
db::{Db, WriteBatch},
- io::{BlockHeight, BlockReader, BlockWriter, DbBlock},
+ io::{
+ BlockHeight, BlockReader, BlockWriter, DbBlock, MetadataReader,
+ MetadataWriter, SchemaVersion,
+ },
};
use chronik_util::{log, log_chronik};
use thiserror::Error;
+const CURRENT_INDEXER_VERSION: SchemaVersion = 1;
+
/// Params for setting up a [`ChronikIndexer`] instance.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ChronikIndexerParams {
@@ -63,6 +68,37 @@
/// Higher height that exists
exists: BlockHeight,
},
+
+ /// Corrupted schema version
+ #[error(
+ "Corrupted schema version in the Chronik database, consider running \
+ -reindex/-chronikreindex"
+ )]
+ CorruptedSchemaVersion,
+
+ /// Missing schema version for non-empty database
+ #[error(
+ "Missing schema version in non-empty Chronik database, consider \
+ running -reindex/-chronikreindex"
+ )]
+ MissingSchemaVersion,
+
+ /// This Chronik instance is outdated
+ #[error(
+ "Chronik outdated: Chronik has version {}, but the database has \
+ version {0}. Upgrade your node to the appropriate version.",
+ CURRENT_INDEXER_VERSION
+ )]
+ ChronikOutdated(SchemaVersion),
+
+ /// Database is outdated
+ #[error(
+ "DB outdated: Chronik has version {}, but the database has version \
+ {0}. -reindex/-chronikreindex to reindex the database to the new \
+ version.",
+ CURRENT_INDEXER_VERSION
+ )]
+ DatabaseOutdated(SchemaVersion),
}
use self::ChronikIndexerError::*;
@@ -83,6 +119,7 @@
}
log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy());
let db = Db::open(&db_path)?;
+ verify_schema_version(&db)?;
Ok(ChronikIndexer { db })
}
@@ -235,15 +272,51 @@
ChronikBlock { db_block }
}
+fn verify_schema_version(db: &Db) -> Result<()> {
+ let metadata_reader = MetadataReader::new(db)?;
+ let metadata_writer = MetadataWriter::new(db)?;
+ let is_empty = db.is_db_empty()?;
+ log_chronik!("Note: Chronik empty? = {is_empty}\n");
+ match metadata_reader
+ .schema_version()
+ .wrap_err(CorruptedSchemaVersion)?
+ {
+ Some(schema_version) => {
+ assert!(!is_empty, "Empty DB can't have a schema version");
+ if schema_version > CURRENT_INDEXER_VERSION {
+ return Err(ChronikOutdated(schema_version).into());
+ }
+ if schema_version < CURRENT_INDEXER_VERSION {
+ return Err(DatabaseOutdated(schema_version).into());
+ }
+ }
+ None => {
+ if !is_empty {
+ return Err(MissingSchemaVersion.into());
+ }
+ let mut batch = WriteBatch::default();
+ metadata_writer
+ .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?;
+ db.write_batch(batch)?;
+ }
+ }
+ log!("Chronik has version {CURRENT_INDEXER_VERSION}\n");
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use abc_rust_error::Result;
use bitcoinsuite_core::block::BlockHash;
- use chronik_db::io::{BlockReader, DbBlock};
+ use chronik_db::{
+ db::{Db, WriteBatch, CF_META},
+ io::{BlockReader, DbBlock, MetadataReader, MetadataWriter},
+ };
use pretty_assertions::assert_eq;
use crate::indexer::{
- ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams,
+ ChronikBlock, ChronikIndexer, ChronikIndexerError,
+ ChronikIndexerParams, CURRENT_INDEXER_VERSION,
};
#[test]
@@ -307,4 +380,104 @@
Ok(())
}
+
+ #[test]
+ fn test_schema_version() -> Result<()> {
+ let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?;
+ let chronik_path = dir.path().join("indexes").join("chronik");
+ let params = ChronikIndexerParams {
+ datadir_net: dir.path().to_path_buf(),
+ wipe_db: false,
+ };
+
+ // Setting up DB first time sets the schema version
+ ChronikIndexer::setup(params.clone())?;
+ {
+ let db = Db::open(&chronik_path)?;
+ assert_eq!(
+ MetadataReader::new(&db)?.schema_version()?,
+ Some(CURRENT_INDEXER_VERSION)
+ );
+ }
+ // Opening DB again works fine
+ ChronikIndexer::setup(params.clone())?;
+
+ // Override DB schema version to 0
+ {
+ let db = Db::open(&chronik_path)?;
+ let mut batch = WriteBatch::default();
+ MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?;
+ db.write_batch(batch)?;
+ }
+ // -> DB too old
+ assert_eq!(
+ ChronikIndexer::setup(params.clone())
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::DatabaseOutdated(0),
+ );
+
+ // Override DB schema version to CURRENT_INDEXER_VERSION + 1
+ {
+ let db = Db::open(&chronik_path)?;
+ let mut batch = WriteBatch::default();
+ MetadataWriter::new(&db)?.update_schema_version(
+ &mut batch,
+ CURRENT_INDEXER_VERSION + 1,
+ )?;
+ db.write_batch(batch)?;
+ }
+ // -> Chronik too old
+ assert_eq!(
+ ChronikIndexer::setup(params.clone())
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1),
+ );
+
+ // Corrupt schema version
+ {
+ let db = Db::open(&chronik_path)?;
+ let cf_meta = db.cf(CF_META)?;
+ let mut batch = WriteBatch::default();
+ batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]);
+ db.write_batch(batch)?;
+ }
+ assert_eq!(
+ ChronikIndexer::setup(params)
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::CorruptedSchemaVersion,
+ );
+
+ // New db path, but has existing data
+ let new_dir = dir.path().join("new");
+ let new_chronik_path = new_dir.join("indexes").join("chronik");
+ std::fs::create_dir_all(&new_chronik_path)?;
+ let new_params = ChronikIndexerParams {
+ datadir_net: new_dir,
+ wipe_db: false,
+ };
+ {
+ // new db with obscure field in meta
+ let db = Db::open(&new_chronik_path)?;
+ let mut batch = WriteBatch::default();
+ batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR");
+ db.write_batch(batch)?;
+ }
+ // Error: non-empty DB without schema version
+ assert_eq!(
+ ChronikIndexer::setup(new_params.clone())
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::MissingSchemaVersion,
+ );
+ // with wipe it works
+ ChronikIndexer::setup(ChronikIndexerParams {
+ wipe_db: true,
+ ..new_params
+ })?;
+
+ Ok(())
+ }
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, May 20, 20:36 (14 h, 21 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5865897
Default Alt Text
D13337.id38596.diff (13 KB)
Attached To
D13337: [Chronik] Add metadata and schema version to DB
Event Timeline
Log In to Comment