Page MenuHomePhabricator

D13337.id38596.diff
No OneTemporary

D13337.id38596.diff

diff --git a/chronik/chronik-db/src/db.rs b/chronik/chronik-db/src/db.rs
--- a/chronik/chronik-db/src/db.rs
+++ b/chronik/chronik-db/src/db.rs
@@ -13,10 +13,13 @@
use rocksdb::{ColumnFamilyDescriptor, IteratorMode};
use thiserror::Error;
-use crate::io::BlockWriter;
+use crate::io::{BlockWriter, MetadataWriter};
// All column family names used by Chronik should be defined here
-pub(crate) const CF_BLK: &str = "blk";
+/// Column family name for the block data.
+pub const CF_BLK: &str = "blk";
+/// Column family name for db metadata.
+pub const CF_META: &str = "meta";
pub(crate) type CF = rocksdb::ColumnFamily;
@@ -25,6 +28,7 @@
#[derive(Debug)]
pub struct Db {
db: rocksdb::DB,
+ cf_names: Vec<String>,
}
/// Errors indicating something went wrong with the database itself.
@@ -47,6 +51,7 @@
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let mut cfs = Vec::new();
BlockWriter::add_cfs(&mut cfs);
+ MetadataWriter::add_cfs(&mut cfs);
Self::open_with_cfs(path, cfs)
}
@@ -55,9 +60,10 @@
cfs: Vec<ColumnFamilyDescriptor>,
) -> Result<Self> {
let db_options = Self::db_options();
+ let cf_names = cfs.iter().map(|cf| cf.name().to_string()).collect();
let db = rocksdb::DB::open_cf_descriptors(&db_options, path, cfs)
.map_err(RocksDb)?;
- Ok(Db { db })
+ Ok(Db { db, cf_names })
}
fn db_options() -> rocksdb::Options {
@@ -81,7 +87,8 @@
Ok(())
}
- pub(crate) fn cf(&self, name: &str) -> Result<&CF> {
+ /// Return a column family handle with the given name.
+ pub fn cf(&self, name: &str) -> Result<&CF> {
Ok(self
.db
.cf_handle(name)
@@ -110,4 +117,52 @@
self.db.write(write_batch).map_err(RocksDb)?;
Ok(())
}
+
+ /// Whether any of the column families in the DB have any data.
+ ///
+ /// Note: RocksDB forbids not opening all column families, therefore, this
+ /// will always iter through all column families.
+ pub fn is_db_empty(&self) -> Result<bool> {
+ for cf_name in &self.cf_names {
+ let cf = self.cf(cf_name)?;
+ let mut cf_iter = self.db.full_iterator_cf(cf, IteratorMode::Start);
+ let val = cf_iter.next();
+ println!("cf {} val: {:?}", cf_name, val);
+ if val.is_some() {
+ return Ok(false);
+ }
+ }
+ Ok(true)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use abc_rust_error::Result;
+ use rocksdb::ColumnFamilyDescriptor;
+
+ use crate::db::{Db, DbError};
+
+ #[test]
+ fn test_must_specify_all_cfs() -> Result<()> {
+ // Ensure rocksdb requires us to always open all CFs
+ let tempdir = tempdir::TempDir::new("chronik-db--specify-all-cfs")?;
+ let opts = rocksdb::Options::default();
+ let cfs = vec![
+ ColumnFamilyDescriptor::new("cf1", opts.clone()),
+ ColumnFamilyDescriptor::new("cf2", opts.clone()),
+ ColumnFamilyDescriptor::new("cf3", opts.clone()),
+ ];
+ Db::open_with_cfs(tempdir.path(), cfs)?;
+ let cfs = vec![ColumnFamilyDescriptor::new("cf1", opts)];
+ assert_eq!(
+ Db::open_with_cfs(tempdir.path(), cfs)
+ .unwrap_err()
+ .downcast::<DbError>()?
+ .to_string(),
+ "RocksDB error: Invalid argument: Column families not opened: \
+ cf3, cf2"
+ );
+ Ok(())
+ }
}
diff --git a/chronik/chronik-db/src/io/metadata.rs b/chronik/chronik-db/src/io/metadata.rs
new file mode 100644
--- /dev/null
+++ b/chronik/chronik-db/src/io/metadata.rs
@@ -0,0 +1,87 @@
+// Copyright (c) 2023 The Bitcoin developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+use abc_rust_error::Result;
+use rocksdb::ColumnFamilyDescriptor;
+
+use crate::{
+ db::{Db, CF, CF_META},
+ ser::{db_deserialize, db_serialize},
+};
+
+/// Type for the version of the database, to let us know when we're outdated.
+pub type SchemaVersion = u64;
+
+/// Field in the `meta` cf storing the schema version.
+pub const FIELD_SCHEMA_VERSION: &[u8] = b"SCHEMA_VERSION";
+
+/// Write database metadata
+pub struct MetadataWriter<'a> {
+ cf: &'a CF,
+}
+
+/// Read database metadata
+pub struct MetadataReader<'a> {
+ db: &'a Db,
+ cf: &'a CF,
+}
+
+impl<'a> MetadataWriter<'a> {
+ /// Create a writer to the database for metadata
+ pub fn new(db: &'a Db) -> Result<Self> {
+ let cf = db.cf(CF_META)?;
+ Ok(MetadataWriter { cf })
+ }
+
+ /// Update the schema version of the database
+ pub fn update_schema_version(
+ &self,
+ batch: &mut rocksdb::WriteBatch,
+ schema_version: SchemaVersion,
+ ) -> Result<()> {
+ batch.put_cf(
+ self.cf,
+ FIELD_SCHEMA_VERSION,
+ db_serialize(&schema_version)?,
+ );
+ Ok(())
+ }
+
+ pub(crate) fn add_cfs(columns: &mut Vec<ColumnFamilyDescriptor>) {
+ columns.push(ColumnFamilyDescriptor::new(
+ CF_META,
+ rocksdb::Options::default(),
+ ));
+ }
+}
+
+impl std::fmt::Debug for MetadataWriter<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "MetadataWriter {{ ... }}")
+ }
+}
+
+impl<'a> MetadataReader<'a> {
+ /// Create a reader from the database for metadata
+ pub fn new(db: &'a Db) -> Result<Self> {
+ let cf = db.cf(CF_META)?;
+ Ok(MetadataReader { db, cf })
+ }
+
+ /// Read the schema version of the database
+ pub fn schema_version(&self) -> Result<Option<SchemaVersion>> {
+ match self.db.get(self.cf, FIELD_SCHEMA_VERSION)? {
+ Some(ser_schema_version) => {
+ Ok(Some(db_deserialize(&ser_schema_version)?))
+ }
+ None => Ok(None),
+ }
+ }
+}
+
+impl std::fmt::Debug for MetadataReader<'_> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "MetadataReader {{ ... }}")
+ }
+}
diff --git a/chronik/chronik-db/src/io/mod.rs b/chronik/chronik-db/src/io/mod.rs
--- a/chronik/chronik-db/src/io/mod.rs
+++ b/chronik/chronik-db/src/io/mod.rs
@@ -5,5 +5,7 @@
//! Module containing readers and writers for the database used by Chronik.
mod blocks;
+mod metadata;
pub use self::blocks::*;
+pub use self::metadata::*;
diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs
--- a/chronik/chronik-indexer/src/indexer.rs
+++ b/chronik/chronik-indexer/src/indexer.rs
@@ -11,11 +11,16 @@
use chronik_bridge::{ffi, util::expect_unique_ptr};
use chronik_db::{
db::{Db, WriteBatch},
- io::{BlockHeight, BlockReader, BlockWriter, DbBlock},
+ io::{
+ BlockHeight, BlockReader, BlockWriter, DbBlock, MetadataReader,
+ MetadataWriter, SchemaVersion,
+ },
};
use chronik_util::{log, log_chronik};
use thiserror::Error;
+const CURRENT_INDEXER_VERSION: SchemaVersion = 1;
+
/// Params for setting up a [`ChronikIndexer`] instance.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ChronikIndexerParams {
@@ -63,6 +68,37 @@
/// Higher height that exists
exists: BlockHeight,
},
+
+ /// Corrupted schema version
+ #[error(
+ "Corrupted schema version in the Chronik database, consider running \
+ -reindex/-chronikreindex"
+ )]
+ CorruptedSchemaVersion,
+
+ /// Missing schema version for non-empty database
+ #[error(
+ "Missing schema version in non-empty Chronik database, consider \
+ running -reindex/-chronikreindex"
+ )]
+ MissingSchemaVersion,
+
+ /// This Chronik instance is outdated
+ #[error(
+ "Chronik outdated: Chronik has version {}, but the database has \
+ version {0}. Upgrade your node to the appropriate version.",
+ CURRENT_INDEXER_VERSION
+ )]
+ ChronikOutdated(SchemaVersion),
+
+ /// Database is outdated
+ #[error(
+ "DB outdated: Chronik has version {}, but the database has version \
+ {0}. -reindex/-chronikreindex to reindex the database to the new \
+ version.",
+ CURRENT_INDEXER_VERSION
+ )]
+ DatabaseOutdated(SchemaVersion),
}
use self::ChronikIndexerError::*;
@@ -83,6 +119,7 @@
}
log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy());
let db = Db::open(&db_path)?;
+ verify_schema_version(&db)?;
Ok(ChronikIndexer { db })
}
@@ -235,15 +272,51 @@
ChronikBlock { db_block }
}
+fn verify_schema_version(db: &Db) -> Result<()> {
+ let metadata_reader = MetadataReader::new(db)?;
+ let metadata_writer = MetadataWriter::new(db)?;
+ let is_empty = db.is_db_empty()?;
+ log_chronik!("Note: Chronik empty? = {is_empty}\n");
+ match metadata_reader
+ .schema_version()
+ .wrap_err(CorruptedSchemaVersion)?
+ {
+ Some(schema_version) => {
+ assert!(!is_empty, "Empty DB can't have a schema version");
+ if schema_version > CURRENT_INDEXER_VERSION {
+ return Err(ChronikOutdated(schema_version).into());
+ }
+ if schema_version < CURRENT_INDEXER_VERSION {
+ return Err(DatabaseOutdated(schema_version).into());
+ }
+ }
+ None => {
+ if !is_empty {
+ return Err(MissingSchemaVersion.into());
+ }
+ let mut batch = WriteBatch::default();
+ metadata_writer
+ .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?;
+ db.write_batch(batch)?;
+ }
+ }
+ log!("Chronik has version {CURRENT_INDEXER_VERSION}\n");
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use abc_rust_error::Result;
use bitcoinsuite_core::block::BlockHash;
- use chronik_db::io::{BlockReader, DbBlock};
+ use chronik_db::{
+ db::{Db, WriteBatch, CF_META},
+ io::{BlockReader, DbBlock, MetadataReader, MetadataWriter},
+ };
use pretty_assertions::assert_eq;
use crate::indexer::{
- ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams,
+ ChronikBlock, ChronikIndexer, ChronikIndexerError,
+ ChronikIndexerParams, CURRENT_INDEXER_VERSION,
};
#[test]
@@ -307,4 +380,104 @@
Ok(())
}
+
+ #[test]
+ fn test_schema_version() -> Result<()> {
+ let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?;
+ let chronik_path = dir.path().join("indexes").join("chronik");
+ let params = ChronikIndexerParams {
+ datadir_net: dir.path().to_path_buf(),
+ wipe_db: false,
+ };
+
+ // Setting up DB first time sets the schema version
+ ChronikIndexer::setup(params.clone())?;
+ {
+ let db = Db::open(&chronik_path)?;
+ assert_eq!(
+ MetadataReader::new(&db)?.schema_version()?,
+ Some(CURRENT_INDEXER_VERSION)
+ );
+ }
+ // Opening DB again works fine
+ ChronikIndexer::setup(params.clone())?;
+
+ // Override DB schema version to 0
+ {
+ let db = Db::open(&chronik_path)?;
+ let mut batch = WriteBatch::default();
+ MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?;
+ db.write_batch(batch)?;
+ }
+ // -> DB too old
+ assert_eq!(
+ ChronikIndexer::setup(params.clone())
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::DatabaseOutdated(0),
+ );
+
+ // Override DB schema version to CURRENT_INDEXER_VERSION + 1
+ {
+ let db = Db::open(&chronik_path)?;
+ let mut batch = WriteBatch::default();
+ MetadataWriter::new(&db)?.update_schema_version(
+ &mut batch,
+ CURRENT_INDEXER_VERSION + 1,
+ )?;
+ db.write_batch(batch)?;
+ }
+ // -> Chronik too old
+ assert_eq!(
+ ChronikIndexer::setup(params.clone())
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1),
+ );
+
+ // Corrupt schema version
+ {
+ let db = Db::open(&chronik_path)?;
+ let cf_meta = db.cf(CF_META)?;
+ let mut batch = WriteBatch::default();
+ batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]);
+ db.write_batch(batch)?;
+ }
+ assert_eq!(
+ ChronikIndexer::setup(params)
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::CorruptedSchemaVersion,
+ );
+
+ // New db path, but has existing data
+ let new_dir = dir.path().join("new");
+ let new_chronik_path = new_dir.join("indexes").join("chronik");
+ std::fs::create_dir_all(&new_chronik_path)?;
+ let new_params = ChronikIndexerParams {
+ datadir_net: new_dir,
+ wipe_db: false,
+ };
+ {
+ // new db with obscure field in meta
+ let db = Db::open(&new_chronik_path)?;
+ let mut batch = WriteBatch::default();
+ batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR");
+ db.write_batch(batch)?;
+ }
+ // Error: non-empty DB without schema version
+ assert_eq!(
+ ChronikIndexer::setup(new_params.clone())
+ .unwrap_err()
+ .downcast::<ChronikIndexerError>()?,
+ ChronikIndexerError::MissingSchemaVersion,
+ );
+ // with wipe it works
+ ChronikIndexer::setup(ChronikIndexerParams {
+ wipe_db: true,
+ ..new_params
+ })?;
+
+ Ok(())
+ }
}

File Metadata

Mime Type
text/plain
Expires
Tue, May 20, 20:36 (9 h, 26 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5865897
Default Alt Text
D13337.id38596.diff (13 KB)

Event Timeline