diff --git a/chronik/bitcoinsuite-core/src/tx/tx.rs b/chronik/bitcoinsuite-core/src/tx/tx.rs --- a/chronik/bitcoinsuite-core/src/tx/tx.rs +++ b/chronik/bitcoinsuite-core/src/tx/tx.rs @@ -170,6 +170,12 @@ } } +impl From<Tx> for TxMut { + fn from(value: Tx) -> Self { + value.tx + } +} + impl BitcoinSer for TxMut { fn ser_to<S: BitcoinSerializer>(&self, bytes: &mut S) { self.version.ser_to(bytes); diff --git a/chronik/chronik-db/src/io/group_history.rs b/chronik/chronik-db/src/io/group_history.rs --- a/chronik/chronik-db/src/io/group_history.rs +++ b/chronik/chronik-db/src/io/group_history.rs @@ -460,7 +460,10 @@ } } -fn key_for_member_page(member_ser: &[u8], page_num: PageNum) -> Vec<u8> { +pub(crate) fn key_for_member_page( + member_ser: &[u8], + page_num: PageNum, +) -> Vec<u8> { [member_ser, &page_num.to_be_bytes()].concat() } diff --git a/chronik/chronik-db/src/io/upgrade.rs b/chronik/chronik-db/src/io/upgrade.rs --- a/chronik/chronik-db/src/io/upgrade.rs +++ b/chronik/chronik-db/src/io/upgrade.rs @@ -2,41 +2,70 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. +use std::collections::HashSet; + use abc_rust_error::Result; use bitcoinsuite_core::{ - script::{opcode::OP_RETURN, write_var_int, COMPRESS_NUM_SPECIAL_SCRIPTS}, - tx::Tx, + script::{ + compress_script_variant, + opcode::{OP_CHECKSIG, OP_RETURN}, + write_var_int, PubKey, UncompressedPubKey, + COMPRESS_NUM_SPECIAL_SCRIPTS, + }, + tx::{Tx, TxMut}, }; use bitcoinsuite_slp::slp::consts::{SLP_LOKAD_ID, TOKEN_TYPE_V2}; use bytes::{BufMut, BytesMut}; -use chronik_util::log; +use chronik_util::{log, log_chronik}; use rocksdb::WriteBatch; +use thiserror::Error; use crate::{ - db::{Db, CF, CF_SCRIPT_HISTORY_NUM_TXS}, + db::{ + Db, CF, CF_SCRIPT_HISTORY, CF_SCRIPT_HISTORY_NUM_TXS, CF_SCRIPT_UTXO, + }, groups::ScriptHistoryReader, index_tx::prepare_indexed_txs, io::{ + key_for_member_page, token::{ TokenIndexError::{BlockNotFound, TokenTxNumNotFound}, TokenReader, TokenWriter, }, BlockReader, TxReader, + UpgradeError::*, }, }; /// Perform upgrades in the DB. pub struct UpgradeWriter<'a> { db: &'a Db, + cf_script_utxo: &'a CF, + cf_script_history: &'a CF, cf_script_history_num_txs: &'a CF, } +/// Error indicating something went wrong with upgrading the DB. +#[derive(Debug, Error, PartialEq, Eq)] +pub enum UpgradeError { + /// Ambiguous P2PK script upgrade + #[error( + "Upgrade failed: {0} ambiguous scripts to upgrade (should be \ + impossible), use -chronikreindex to fix or contact the devs" + )] + AmbiguousP2pkScriptUpgrade(usize), +} + impl<'a> UpgradeWriter<'a> { /// Create a new UpgradeWriter pointing to the Db pub fn new(db: &'a Db) -> Result<Self> { + let cf_script_utxo = db.cf(CF_SCRIPT_UTXO)?; + let cf_script_history = db.cf(CF_SCRIPT_HISTORY)?; let cf_script_history_num_txs = db.cf(CF_SCRIPT_HISTORY_NUM_TXS)?; Ok(UpgradeWriter { db, + cf_script_utxo, + cf_script_history, cf_script_history_num_txs, }) } @@ -137,6 +166,212 @@ Ok(()) } + + /// Fix incorrectly compressed P2PK scripts + pub fn fix_p2pk_compression( + &self, + load_tx: impl Fn(u32, u32, u32) -> Result<Tx>, + shutdown_requested: impl Fn() -> bool, + ) -> Result<()> { + let script_history_reader = ScriptHistoryReader::new(self.db)?; + let block_reader = BlockReader::new(self.db)?; + let tx_reader = TxReader::new(self.db)?; + + // Iterate over all scripts in the DB + let iterator = self.db.full_iterator(self.cf_script_history_num_txs); + + // Compress the script as P2PK (otherwise return None), either with the + // bug enabled or disabled. + fn compress_p2pk_script( + uncompressed_script: &[u8], + with_bug: bool, + ) -> Option<[u8; 33]> { + const COMP_PK_SIZE: u8 = PubKey::SIZE as u8; + const UNCOMP_PK_SIZE: u8 = UncompressedPubKey::SIZE as u8; + const OP_CHECKSIG: u8 = OP_CHECKSIG::N; + + match uncompressed_script { + [COMP_PK_SIZE, pubkey @ .., OP_CHECKSIG] + if pubkey.len() == PubKey::SIZE => + { + if !with_bug && pubkey[0] != 0x02 && pubkey[0] != 0x03 { + return None; + } + Some(pubkey.try_into().unwrap()) + } + [UNCOMP_PK_SIZE, pubkey @ .., OP_CHECKSIG] + if pubkey.len() == UncompressedPubKey::SIZE => + { + if !with_bug && pubkey[0] != 0x04 { + return None; + } + let mut bytes = [0; 33]; + bytes[0] = (pubkey[64] & 0x01) | 0x04; + bytes[1..].copy_from_slice(&pubkey[1..][..32]); + Some(bytes) + } + _ => None, + } + } + + let mut reached_uncompressed_pks = false; + let mut num_scanned = 0; + #[allow(clippy::mutable_key_type)] + let mut scripts_not_to_upgrade = HashSet::new(); + #[allow(clippy::mutable_key_type)] + let mut scripts_to_upgrade = HashSet::new(); + let mut txs_to_upgrade = HashSet::new(); + log!("Fixing P2PK scripts. This may take a while on slow hardware\n"); + for entry in iterator { + let (key, _) = entry?; + num_scanned += 1; + // Before keys start with 0x04, we encounter scripts very rarely. + // After, they are quite frequent, so we log more often so it + // doesn't seem like the upgrade got stuck. + if (!reached_uncompressed_pks && num_scanned % 1000000 == 0) + || (reached_uncompressed_pks && num_scanned % 10000 == 0) + { + log!( + "Scanned {num_scanned} scripts, found {} to upgrade\n", + scripts_to_upgrade.len(), + ); + } + if !reached_uncompressed_pks && key[0] == 0x04 { + reached_uncompressed_pks = true; + } + if num_scanned % 1000 == 0 && shutdown_requested() { + log!("Cancelled upgrade\n"); + return Ok(()); + } + + if key.len() != 33 { + // Only 33 byte long keys are affected + continue; + } + if key[0] == 0x02 || key[0] == 0x03 { + // These are definitely correct + continue; + } + + let (num_pages, _) = + script_history_reader.member_num_pages_and_txs(&key)?; + + // Iterate through all pages of the Script + for page_num in 0..num_pages as u32 { + let Some(txs) = + script_history_reader.page_txs(&key, page_num)? + else { + break; + }; + + // Iterate through all txs of the Script + for tx_num in txs { + // Load tx from the node + let block_tx = tx_reader + .tx_by_tx_num(tx_num)? + .ok_or(TokenTxNumNotFound(tx_num))?; + let block = block_reader + .by_height(block_tx.block_height)? + .ok_or(BlockNotFound(block_tx.block_height))?; + let tx_entry = block_tx.entry; + let tx = TxMut::from(load_tx( + block.file_num, + tx_entry.data_pos, + tx_entry.undo_pos, + )?); + + // Iterate all outputs. We skip iterating inputs, as scripts + // for which the compression is buggy are unspendable. + for output in tx.outputs { + let with_bug = compress_p2pk_script( + output.script.bytecode(), + true, + ); + let Some(with_bug) = with_bug else { continue }; + if key.as_ref() != with_bug.as_ref() { + // Not the script we were looking for + continue; + } + let without_bug = compress_p2pk_script( + output.script.bytecode(), + false, + ); + if without_bug == Some(with_bug) { + // No bug + scripts_not_to_upgrade.insert(output.script); + } else { + // Buggy compression + scripts_to_upgrade.insert(output.script); + txs_to_upgrade.insert(tx_entry.txid); + } + } + } + } + } + + for &txid in &txs_to_upgrade { + log_chronik!("Tx {txid} has incorrectly compressed P2PK script\n"); + } + + log!("Upgrading {} scripts\n", scripts_to_upgrade.len()); + log_chronik!( + "Skipping upgrade on {} scripts\n", + scripts_not_to_upgrade.len() + ); + let num_scripts_both = scripts_to_upgrade + .intersection(&scripts_not_to_upgrade) + .count(); + log_chronik!("There's {num_scripts_both} ambiguous scripts\n"); + + if num_scripts_both > 0 { + // Should be impossible; but we better handle the error. + return Err(AmbiguousP2pkScriptUpgrade(num_scripts_both).into()); + } + + let mut batch = WriteBatch::default(); + for script in &scripts_to_upgrade { + let with_bug = compress_p2pk_script(script.bytecode(), true) + .expect("Impossible"); + let script_variant = script.variant(); + let without_bug = compress_script_variant(&script_variant); + let (num_pages, _) = + script_history_reader.member_num_pages_and_txs(&with_bug)?; + if let Some(value) = self.db.get(self.cf_script_utxo, with_bug)? { + batch.delete_cf(self.cf_script_utxo, with_bug); + batch.put_cf(self.cf_script_utxo, &without_bug, value); + } + if let Some(value) = + self.db.get(self.cf_script_history_num_txs, with_bug)? + { + batch.delete_cf(self.cf_script_history_num_txs, with_bug); + batch.put_cf( + self.cf_script_history_num_txs, + &without_bug, + value, + ); + } + for page_num in 0..num_pages as u32 { + let key_with_bug = key_for_member_page(&with_bug, page_num); + let key_without_bug = + key_for_member_page(&without_bug, page_num); + if let Some(value) = + self.db.get(self.cf_script_history, &key_with_bug)? + { + batch.delete_cf(self.cf_script_history, &key_with_bug); + batch.put_cf( + self.cf_script_history, + &key_without_bug, + value, + ); + } + } + } + + log!("Writing {} updates to fix P2PK compression\n", batch.len()); + self.db.write_batch(batch)?; + + Ok(()) + } } impl std::fmt::Debug for UpgradeWriter<'_> { diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -240,6 +240,7 @@ pub fn setup( params: ChronikIndexerParams, load_tx: impl Fn(u32, u32, u32) -> Result<Tx>, + shutdown_requested: impl Fn() -> bool, ) -> Result<Self> { let indexes_path = params.datadir_net.join("indexes"); let perf_path = params.datadir_net.join("perf"); @@ -272,7 +273,9 @@ schema_version, params.enable_token_index, &load_tx, + &shutdown_requested, )?; + let plugin_name_map = update_plugins_index( &db, ¶ms.plugin_ctx, @@ -1094,6 +1097,7 @@ mut schema_version: u64, enable_token_index: bool, load_tx: impl Fn(u32, u32, u32) -> Result<Tx>, + shutdown_requested: impl Fn() -> bool, ) -> Result<()> { // DB has version 10, upgrade to 11 if schema_version == 10 { @@ -1110,7 +1114,12 @@ // if schema_version == 12 { // UPGRADE13: UNCOMMENT // UPGRADE13: COMMENT NEXT LINE if false { - upgrade_12_to_13(db, enable_token_index, &load_tx)?; + upgrade_12_to_13( + db, + enable_token_index, + &load_tx, + &shutdown_requested, + )?; } Ok(()) } @@ -1153,12 +1162,14 @@ db: &Db, enable_token_index: bool, load_tx: impl Fn(u32, u32, u32) -> Result<Tx>, + shutdown_requested: impl Fn() -> bool, ) -> Result<()> { log!("Upgrading Chronik DB from version 12 to 13...\n"); + let upgrade_writer = UpgradeWriter::new(db)?; if enable_token_index { - let upgrade_writer = UpgradeWriter::new(db)?; - upgrade_writer.fix_mint_vault_txs(load_tx)?; + upgrade_writer.fix_mint_vault_txs(&load_tx)?; } + upgrade_writer.fix_p2pk_compression(&load_tx, &shutdown_requested)?; let mut batch = WriteBatch::default(); let metadata_writer = MetadataWriter::new(db)?; metadata_writer.update_schema_version(&mut batch, 13)?; @@ -1408,6 +1419,7 @@ use bitcoinsuite_core::tx::{Tx, TxId, TxMut}; let load_tx = |_, _, _| unreachable!(); + let shutdown_requested = || false; let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); @@ -1422,7 +1434,7 @@ }; // regtest folder doesn't exist yet -> error assert_eq!( - ChronikIndexer::setup(params.clone(), load_tx) + ChronikIndexer::setup(params.clone(), load_tx, shutdown_requested) .unwrap_err() .downcast::<ChronikIndexerError>()?, ChronikIndexerError::CreateDirFailed(datadir_net.join("indexes")), @@ -1430,7 +1442,8 @@ // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; - let mut indexer = ChronikIndexer::setup(params.clone(), load_tx)?; + let mut indexer = + ChronikIndexer::setup(params.clone(), load_tx, shutdown_requested)?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); @@ -1478,6 +1491,7 @@ ..params }, load_tx, + shutdown_requested, )?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); @@ -1487,6 +1501,7 @@ #[test] fn test_schema_version() -> Result<()> { let load_tx = |_, _, _| unreachable!(); + let shutdown_requested = || false; let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { @@ -1500,7 +1515,7 @@ }; // Setting up DB first time sets the schema version - ChronikIndexer::setup(params.clone(), load_tx)?; + ChronikIndexer::setup(params.clone(), load_tx, shutdown_requested)?; { let db = Db::open(&chronik_path)?; assert_eq!( @@ -1509,7 +1524,7 @@ ); } // Opening DB again works fine - ChronikIndexer::setup(params.clone(), load_tx)?; + ChronikIndexer::setup(params.clone(), load_tx, shutdown_requested)?; // Override DB schema version to 0 { @@ -1520,7 +1535,7 @@ } // -> DB too old assert_eq!( - ChronikIndexer::setup(params.clone(), load_tx) + ChronikIndexer::setup(params.clone(), load_tx, shutdown_requested) .unwrap_err() .downcast::<ChronikIndexerError>()?, ChronikIndexerError::DatabaseOutdated(0), @@ -1538,7 +1553,7 @@ } // -> Chronik too old assert_eq!( - ChronikIndexer::setup(params.clone(), load_tx) + ChronikIndexer::setup(params.clone(), load_tx, shutdown_requested) .unwrap_err() .downcast::<ChronikIndexerError>()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), @@ -1553,7 +1568,7 @@ db.write_batch(batch)?; } assert_eq!( - ChronikIndexer::setup(params.clone(), load_tx) + ChronikIndexer::setup(params.clone(), load_tx, shutdown_requested) .unwrap_err() .downcast::<ChronikIndexerError>()?, ChronikIndexerError::CorruptedSchemaVersion, @@ -1577,9 +1592,13 @@ } // Error: non-empty DB without schema version assert_eq!( - ChronikIndexer::setup(new_params.clone(), load_tx) - .unwrap_err() - .downcast::<ChronikIndexerError>()?, + ChronikIndexer::setup( + new_params.clone(), + load_tx, + shutdown_requested + ) + .unwrap_err() + .downcast::<ChronikIndexerError>()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works @@ -1589,6 +1608,7 @@ ..new_params }, load_tx, + shutdown_requested, )?; Ok(()) diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -106,6 +106,7 @@ |file_num, data_pos, undo_pos| { Ok(Tx::from(bridge_ref.load_tx(file_num, data_pos, undo_pos)?)) }, + || bridge_ref.shutdown_requested(), )?; indexer.resync_indexer(bridge_ref)?; if bridge.shutdown_requested() {