diff --git a/electrum/electrumabc/consolidate.py b/electrum/electrumabc/consolidate.py index 9dd581bd01..525a86dc23 100644 --- a/electrum/electrumabc/consolidate.py +++ b/electrum/electrumabc/consolidate.py @@ -1,146 +1,142 @@ #!/usr/bin/env python3 # Electrum ABC - lightweight eCash client # Copyright (C) 2021 The Electrum ABC developers # # Permission is hereby granted, free of charge, to any person # obtaining a copy of this software and associated documentation files # (the "Software"), to deal in the Software without restriction, # including without limitation the rights to use, copy, modify, merge, # publish, distribute, sublicense, and/or sell copies of the Software, # and to permit persons to whom the Software is furnished to do so, # subject to the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. """ This module provides coin consolidation tools. """ from typing import Iterator, List, Optional, Tuple from . import wallet from .address import Address from .bitcoin import TYPE_ADDRESS from .transaction import Transaction MAX_STANDARD_TX_SIZE: int = 100_000 """Maximum size for transactions that nodes are willing to relay/mine. """ MAX_TX_SIZE: int = 1_000_000 """ Maximum allowed size for a transaction in a block. """ FEERATE: int = 1 """satoshis per byte""" class AddressConsolidator: """Consolidate coins for a single address in a wallet.""" def __init__( self, address: Address, wallet_instance: wallet.AbstractWallet, include_coinbase: bool = True, include_non_coinbase: bool = True, include_frozen: bool = False, include_slp: bool = False, min_value_sats: Optional[int] = None, max_value_sats: Optional[int] = None, min_height: Optional[int] = None, max_height: Optional[int] = None, output_address: Optional[Address] = None, max_tx_size: Optional[int] = MAX_STANDARD_TX_SIZE, ): # output address defaults to input address if unspecified self.output_address = output_address or address self.max_tx_size = max_tx_size assert self.max_tx_size <= MAX_TX_SIZE self._coins = [ utxo for utxo in wallet_instance.get_addr_utxo(address).values() if ( (include_coinbase or not utxo["coinbase"]) and (include_non_coinbase or utxo["coinbase"]) and (include_slp or utxo["slp_token"] is None) and (include_frozen or not utxo["is_frozen_coin"]) and (min_value_sats is None or utxo["value"] >= min_value_sats) and (max_value_sats is None or utxo["value"] <= max_value_sats) and (min_height is None or utxo["height"] >= min_height) and (max_height is None or utxo["height"] <= max_height) ) ] + self.sign_schnorr = wallet_instance.is_schnorr_enabled() + # Add more metadata to coins address_history = wallet_instance.get_address_history(address) received = wallet_instance.get_address_unspent(address, address_history) for coin in self._coins: wallet_instance.add_input_info(coin, received) def get_unsigned_transactions(self) -> List[Transaction]: """ Build as many raw transactions as needed to consolidate the coins. - - :param output_address: Make all transactions send the total amount to this - address. - :param max_tx_size: Maximum tx size in bytes. This is what limits the - number of inputs per transaction. - :return: """ return list(self.iter_transactions()) def iter_transactions(self) -> Iterator[Transaction]: coin_index = 0 while coin_index < len(self._coins): coin_index, tx = self.build_another_transaction(coin_index) yield tx def build_another_transaction(self, coin_index: int) -> Tuple[int, Transaction]: """Build another transaction using coins starting at index coin_index. Return a 2-tuple with the index of the next unused coin and the transaction. """ tx_size = 0 amount = 0 - tx = Transaction(None) + tx = Transaction(None, sign_schnorr=self.sign_schnorr) tx.set_inputs([]) while tx_size < self.max_tx_size and coin_index < len(self._coins): tx_size = self.try_adding_another_coin_to_transaction( tx, self._coins[coin_index], amount + self._coins[coin_index]["value"], ) if tx_size < self.max_tx_size: amount = amount + self._coins[coin_index]["value"] coin_index += 1 return coin_index, tx def try_adding_another_coin_to_transaction( self, tx: Transaction, coin: dict, next_amount: int, ) -> int: """Add coin to tx.inputs() if the resulting tx size is less than max_tx_size. Return the resulting tx_size (no matter if the coin was actually added or not). """ - dummy_tx = Transaction(None) + dummy_tx = Transaction(None, sign_schnorr=self.sign_schnorr) dummy_tx.set_inputs(tx.inputs() + [coin]) dummy_tx.set_outputs([(TYPE_ADDRESS, self.output_address, next_amount)]) tx_size = len(dummy_tx.serialize(estimate_size=True)) // 2 if tx_size < self.max_tx_size: tx.add_inputs([coin]) tx.set_outputs( [(TYPE_ADDRESS, self.output_address, next_amount - tx_size * FEERATE)] ) return tx_size diff --git a/electrum/electrumabc/tests/test_consolidate.py b/electrum/electrumabc/tests/test_consolidate.py index d39d37b973..753c8d9d24 100644 --- a/electrum/electrumabc/tests/test_consolidate.py +++ b/electrum/electrumabc/tests/test_consolidate.py @@ -1,240 +1,256 @@ import math import unittest from unittest.mock import Mock from .. import consolidate from ..address import Address TEST_ADDRESS: Address = Address.from_string( "ecash:qr3l6uufcuwm9prgpa6cfxnez87fzstxesp7ugp0ez" ) FEERATE: int = 1 """Satoshis per byte""" class TestConsolidateCoinSelection(unittest.TestCase): def setUp(self) -> None: coins = {} tx_history = [] i = 0 for is_coinbase in (True, False): for is_frozen_coin in (True, False): for slp in (None, "not None"): coins[f"dummy_txid:{i}"] = { "address": TEST_ADDRESS, "prevout_n": i, "prevout_hash": "a" * 64, "height": 700_000 + i, "value": 1000 + i, "coinbase": is_coinbase, "is_frozen_coin": is_frozen_coin, "slp_token": slp, "type": "p2pkh", } i += 1 # noqa: SIM113 tx_history.append(("a" * 64, 1)) self.mock_wallet = Mock() + self.mock_wallet.is_schnorr_enabled.return_value = False self.mock_wallet.get_addr_utxo.return_value = coins self.mock_wallet.get_address_history.return_value = tx_history self.mock_wallet.get_address_history.return_value = tx_history self.mock_wallet.get_txin_type.return_value = "p2pkh" # mock for self.wallet.txo.get(tx_hash, {}).get(address, []) # returns list of (prevout_n, value, is_coinbase) self.mock_wallet.txo = Mock() txo_get_return = Mock() txo_get_return.get.return_value = [ ( i, 1000 + i, True, ) for i in range(len(coins)) ] self.mock_wallet.txo.get.return_value = txo_get_return self.mock_wallet.get_address_index.return_value = True, 0 self.mock_wallet.keystore.get_xpubkey.return_value = "dummy" def test_coin_selection(self) -> None: for incl_coinbase in (True, False): for incl_noncoinbase in (True, False): for incl_frozen in (True, False): for incl_slp in (True, False): consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, incl_coinbase, incl_noncoinbase, incl_frozen, incl_slp, ) for coin in consolidator._coins: if not incl_coinbase: self.assertFalse(coin["coinbase"]) if not incl_noncoinbase: self.assertTrue(coin["coinbase"]) if not incl_frozen: self.assertFalse(coin["is_frozen_coin"]) if not incl_slp: self.assertIsNone(coin["slp_token"]) # test minimum and maximum value consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_value_sats=1003, max_value_sats=None, ) for coin in consolidator._coins: self.assertGreaterEqual(coin["value"], 1003) self.assertEqual(len(consolidator._coins), 5) consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_value_sats=None, max_value_sats=1005, ) for coin in consolidator._coins: self.assertLessEqual(coin["value"], 1005) self.assertEqual(len(consolidator._coins), 6) consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_value_sats=1003, max_value_sats=1005, ) for coin in consolidator._coins: self.assertGreaterEqual(coin["value"], 1003) self.assertLessEqual(coin["value"], 1005) self.assertEqual(len(consolidator._coins), 3) # test minimum and maximum height consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_height=None, max_height=700_005, ) for coin in consolidator._coins: self.assertLessEqual(coin["height"], 700_005) self.assertEqual(len(consolidator._coins), 6) consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_height=700_003, max_height=None, ) for coin in consolidator._coins: self.assertGreaterEqual(coin["height"], 700_003) self.assertEqual(len(consolidator._coins), 5) consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_height=700_003, max_height=700_005, ) for coin in consolidator._coins: self.assertGreaterEqual(coin["height"], 700_003) self.assertLessEqual(coin["height"], 700_005) self.assertEqual(len(consolidator._coins), 3) # Filter both on height and value consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_value_sats=1004, max_value_sats=1006, min_height=700_003, max_height=700_005, ) for coin in consolidator._coins: self.assertGreaterEqual(coin["value"], 1003) self.assertLessEqual(coin["value"], 1006) self.assertGreaterEqual(coin["height"], 700_003) self.assertLessEqual(coin["height"], 700_005) self.assertEqual(len(consolidator._coins), 2) - def test_get_unsigned_transactions(self): + def test_get_unsigned_transactions_schnorr(self): + self._test_get_unsigned_transactions(sign_schnorr=True) + + def test_get_unsigned_transactions_ecdsa(self): + self._test_get_unsigned_transactions(sign_schnorr=False) + + def _test_get_unsigned_transactions(self, sign_schnorr: bool): + self.mock_wallet.is_schnorr_enabled.return_value = sign_schnorr + n_coins = 8 min_value = 1000 max_value = 1007 for max_tx_size in range(200, 1500, 100): # select all coins consolidator = consolidate.AddressConsolidator( TEST_ADDRESS, self.mock_wallet, True, True, True, True, min_value_sats=None, max_value_sats=None, output_address=TEST_ADDRESS, max_tx_size=max_tx_size, ) self.assertEqual(n_coins, len(consolidator._coins)) txs = consolidator.get_unsigned_transactions() for tx in txs: self.assertLess(len(tx.serialize(estimate_size=True)) // 2, max_tx_size) - # tx size is roughly 148 * n_in + 34 * n_out + 10 - expected_max_n_inputs_for_size = math.floor((max_tx_size - 44) / 148) + # txid(32) + prevout_n(4) + compact_size (1) + scriptsig + sequence (4) + # with scriptsig: compact_size(1) + pubkey(33) + compact_size(1) + sig + # with sig 65 (schnorr) or ~72 (ecdsa) + input_approx_size = 148 if not sign_schnorr else 141 + # tx size is: input_size * n_in + 34 * n_out + 10 + # with n_out = 1 for consolidation transactions + expected_max_n_inputs_for_size = math.floor( + (max_tx_size - 44) / input_approx_size + ) self.assertEqual( len(txs), math.ceil(n_coins / expected_max_n_inputs_for_size) ) # Check the fee and amount total_input_value = 0 total_output_value = 0 total_fee = 0 total_size = 0 for tx in txs: tx_size = len(tx.serialize(estimate_size=True)) // 2 self.assertEqual(tx.get_fee(), tx_size * FEERATE) total_fee += tx.get_fee() total_input_value += tx.input_value() total_output_value += tx.output_value() total_size += tx_size self.assertEqual(total_input_value, sum(range(min_value, max_value + 1))) self.assertEqual(total_output_value, total_input_value - total_fee) self.assertEqual(total_fee, total_size * FEERATE) if __name__ == "__main__": unittest.main()