Changeset View
Changeset View
Standalone View
Standalone View
test/functional/wallet_descriptor.py
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||
# Copyright (c) 2019 The Bitcoin Core developers | # Copyright (c) 2019 The Bitcoin Core developers | ||||
# Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
"""Test descriptor wallet function.""" | """Test descriptor wallet function.""" | ||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
from test_framework.util import assert_equal, assert_raises_rpc_error | from test_framework.util import assert_equal, assert_raises_rpc_error | ||||
class WalletDescriptorTest(BitcoinTestFramework): | class WalletDescriptorTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.setup_clean_chain = True | self.setup_clean_chain = True | ||||
self.num_nodes = 1 | self.num_nodes = 1 | ||||
self.extra_args = [['-keypool=100']] | self.extra_args = [["-keypool=100"]] | ||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_wallet() | self.skip_if_no_wallet() | ||||
def run_test(self): | def run_test(self): | ||||
# Make a descriptor wallet | # Make a descriptor wallet | ||||
self.log.info("Making a descriptor wallet") | self.log.info("Making a descriptor wallet") | ||||
self.nodes[0].createwallet(wallet_name="desc1", descriptors=True) | self.nodes[0].createwallet(wallet_name="desc1", descriptors=True) | ||||
self.nodes[0].unloadwallet(self.default_wallet_name) | self.nodes[0].unloadwallet(self.default_wallet_name) | ||||
# A descriptor wallet should have 100 addresses = 100 keys | # A descriptor wallet should have 100 addresses = 100 keys | ||||
self.log.info("Checking wallet info") | self.log.info("Checking wallet info") | ||||
wallet_info = self.nodes[0].getwalletinfo() | wallet_info = self.nodes[0].getwalletinfo() | ||||
assert_equal(wallet_info['keypoolsize'], 100) | assert_equal(wallet_info["keypoolsize"], 100) | ||||
assert_equal(wallet_info['keypoolsize_hd_internal'], 100) | assert_equal(wallet_info["keypoolsize_hd_internal"], 100) | ||||
assert 'keypoololdest' not in wallet_info | assert "keypoololdest" not in wallet_info | ||||
# Check that getnewaddress works | # Check that getnewaddress works | ||||
self.log.info("Test that getnewaddress and getrawchangeaddress work") | self.log.info("Test that getnewaddress and getrawchangeaddress work") | ||||
addr = self.nodes[0].getnewaddress("", "legacy") | addr = self.nodes[0].getnewaddress("", "legacy") | ||||
addr_info = self.nodes[0].getaddressinfo(addr) | addr_info = self.nodes[0].getaddressinfo(addr) | ||||
assert addr_info['desc'].startswith('pkh(') | assert addr_info["desc"].startswith("pkh(") | ||||
assert_equal(addr_info['hdkeypath'], 'm/44\'/1\'/0\'/0/0') | assert_equal(addr_info["hdkeypath"], "m/44'/1'/0'/0/0") | ||||
# Check that getrawchangeaddress works | # Check that getrawchangeaddress works | ||||
addr = self.nodes[0].getrawchangeaddress() | addr = self.nodes[0].getrawchangeaddress() | ||||
addr_info = self.nodes[0].getaddressinfo(addr) | addr_info = self.nodes[0].getaddressinfo(addr) | ||||
assert addr_info['desc'].startswith('pkh(') | assert addr_info["desc"].startswith("pkh(") | ||||
assert_equal(addr_info['hdkeypath'], 'm/44\'/1\'/0\'/1/0') | assert_equal(addr_info["hdkeypath"], "m/44'/1'/0'/1/0") | ||||
# Make a wallet to receive coins at | # Make a wallet to receive coins at | ||||
self.nodes[0].createwallet(wallet_name="desc2", descriptors=True) | self.nodes[0].createwallet(wallet_name="desc2", descriptors=True) | ||||
recv_wrpc = self.nodes[0].get_wallet_rpc("desc2") | recv_wrpc = self.nodes[0].get_wallet_rpc("desc2") | ||||
send_wrpc = self.nodes[0].get_wallet_rpc("desc1") | send_wrpc = self.nodes[0].get_wallet_rpc("desc1") | ||||
# Generate some coins | # Generate some coins | ||||
self.generatetoaddress(self.nodes[0], 101, send_wrpc.getnewaddress()) | self.generatetoaddress(self.nodes[0], 101, send_wrpc.getnewaddress()) | ||||
# Make transactions | # Make transactions | ||||
self.log.info("Test sending and receiving") | self.log.info("Test sending and receiving") | ||||
addr = recv_wrpc.getnewaddress() | addr = recv_wrpc.getnewaddress() | ||||
send_wrpc.sendtoaddress(addr, 10) | send_wrpc.sendtoaddress(addr, 10) | ||||
# Make sure things are disabled | # Make sure things are disabled | ||||
self.log.info("Test disabled RPCs") | self.log.info("Test disabled RPCs") | ||||
assert_raises_rpc_error(-4, | assert_raises_rpc_error( | ||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.importprivkey, | recv_wrpc.rpc.importprivkey, | ||||
"cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW") | "cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW", | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.importpubkey, | recv_wrpc.rpc.importpubkey, | ||||
send_wrpc.getaddressinfo(send_wrpc.getnewaddress())) | send_wrpc.getaddressinfo(send_wrpc.getnewaddress()), | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.importaddress, | recv_wrpc.rpc.importaddress, | ||||
recv_wrpc.getnewaddress()) | recv_wrpc.getnewaddress(), | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.importmulti, | recv_wrpc.rpc.importmulti, | ||||
[]) | [], | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.addmultisigaddress, | recv_wrpc.rpc.addmultisigaddress, | ||||
1, | 1, | ||||
[recv_wrpc.getnewaddress()]) | [recv_wrpc.getnewaddress()], | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.dumpprivkey, | recv_wrpc.rpc.dumpprivkey, | ||||
recv_wrpc.getnewaddress()) | recv_wrpc.getnewaddress(), | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.dumpwallet, | recv_wrpc.rpc.dumpwallet, | ||||
'wallet.dump') | "wallet.dump", | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.importwallet, | recv_wrpc.rpc.importwallet, | ||||
'wallet.dump') | "wallet.dump", | ||||
assert_raises_rpc_error(-4, | ) | ||||
assert_raises_rpc_error( | |||||
-4, | |||||
"This type of wallet does not support this command", | "This type of wallet does not support this command", | ||||
recv_wrpc.rpc.sethdseed) | recv_wrpc.rpc.sethdseed, | ||||
) | |||||
self.log.info("Test encryption") | self.log.info("Test encryption") | ||||
# Get the master fingerprint before encrypt | # Get the master fingerprint before encrypt | ||||
info1 = send_wrpc.getaddressinfo(send_wrpc.getnewaddress()) | info1 = send_wrpc.getaddressinfo(send_wrpc.getnewaddress()) | ||||
# Encrypt wallet 0 | # Encrypt wallet 0 | ||||
send_wrpc.encryptwallet('pass') | send_wrpc.encryptwallet("pass") | ||||
send_wrpc.walletpassphrase('pass', 10) | send_wrpc.walletpassphrase("pass", 10) | ||||
addr = send_wrpc.getnewaddress() | addr = send_wrpc.getnewaddress() | ||||
info2 = send_wrpc.getaddressinfo(addr) | info2 = send_wrpc.getaddressinfo(addr) | ||||
assert info1['hdmasterfingerprint'] != info2['hdmasterfingerprint'] | assert info1["hdmasterfingerprint"] != info2["hdmasterfingerprint"] | ||||
send_wrpc.walletlock() | send_wrpc.walletlock() | ||||
assert 'hdmasterfingerprint' in send_wrpc.getaddressinfo( | assert "hdmasterfingerprint" in send_wrpc.getaddressinfo( | ||||
send_wrpc.getnewaddress()) | send_wrpc.getnewaddress() | ||||
) | |||||
info3 = send_wrpc.getaddressinfo(addr) | info3 = send_wrpc.getaddressinfo(addr) | ||||
assert_equal(info2['desc'], info3['desc']) | assert_equal(info2["desc"], info3["desc"]) | ||||
self.log.info( | self.log.info( | ||||
"Test that getnewaddress still works after keypool is exhausted in an encrypted wallet") | "Test that getnewaddress still works after keypool is exhausted in an" | ||||
" encrypted wallet" | |||||
) | |||||
for _ in range(500): | for _ in range(500): | ||||
send_wrpc.getnewaddress() | send_wrpc.getnewaddress() | ||||
self.log.info( | self.log.info( | ||||
"Test that unlock is needed when deriving only hardened keys in an encrypted wallet") | "Test that unlock is needed when deriving only hardened keys in an" | ||||
send_wrpc.walletpassphrase('pass', 10) | " encrypted wallet" | ||||
send_wrpc.importdescriptors([{ | ) | ||||
send_wrpc.walletpassphrase("pass", 10) | |||||
send_wrpc.importdescriptors( | |||||
[ | |||||
{ | |||||
"desc": "sh(pkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h))#45ls09gz", | "desc": "sh(pkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h))#45ls09gz", | ||||
"timestamp": "now", | "timestamp": "now", | ||||
"range": [0, 10], | "range": [0, 10], | ||||
"active": True | "active": True, | ||||
}]) | } | ||||
] | |||||
) | |||||
send_wrpc.walletlock() | send_wrpc.walletlock() | ||||
# Exhaust keypool of 100 | # Exhaust keypool of 100 | ||||
for _ in range(100): | for _ in range(100): | ||||
send_wrpc.getnewaddress() | send_wrpc.getnewaddress() | ||||
# This should now error | # This should now error | ||||
assert_raises_rpc_error(-12, | assert_raises_rpc_error( | ||||
-12, | |||||
"Keypool ran out, please call keypoolrefill first", | "Keypool ran out, please call keypoolrefill first", | ||||
send_wrpc.getnewaddress, | send_wrpc.getnewaddress, | ||||
'') | "", | ||||
) | |||||
self.log.info("Test born encrypted wallets") | self.log.info("Test born encrypted wallets") | ||||
self.nodes[0].createwallet( | self.nodes[0].createwallet("desc_enc", False, False, "pass", False, True) | ||||
'desc_enc', False, False, 'pass', False, True) | enc_rpc = self.nodes[0].get_wallet_rpc("desc_enc") | ||||
enc_rpc = self.nodes[0].get_wallet_rpc('desc_enc') | |||||
# Makes sure that we can get a new address from a born encrypted wallet | # Makes sure that we can get a new address from a born encrypted wallet | ||||
enc_rpc.getnewaddress() | enc_rpc.getnewaddress() | ||||
self.log.info("Test blank descriptor wallets") | self.log.info("Test blank descriptor wallets") | ||||
self.nodes[0].createwallet( | self.nodes[0].createwallet( | ||||
wallet_name='desc_blank', | wallet_name="desc_blank", blank=True, descriptors=True | ||||
blank=True, | ) | ||||
descriptors=True) | blank_rpc = self.nodes[0].get_wallet_rpc("desc_blank") | ||||
blank_rpc = self.nodes[0].get_wallet_rpc('desc_blank') | assert_raises_rpc_error( | ||||
assert_raises_rpc_error(-4, | -4, "This wallet has no available keys", blank_rpc.getnewaddress | ||||
'This wallet has no available keys', | ) | ||||
blank_rpc.getnewaddress) | |||||
self.log.info("Test descriptor wallet with disabled private keys") | self.log.info("Test descriptor wallet with disabled private keys") | ||||
self.nodes[0].createwallet( | self.nodes[0].createwallet( | ||||
wallet_name='desc_no_priv', | wallet_name="desc_no_priv", disable_private_keys=True, descriptors=True | ||||
disable_private_keys=True, | ) | ||||
descriptors=True) | nopriv_rpc = self.nodes[0].get_wallet_rpc("desc_no_priv") | ||||
nopriv_rpc = self.nodes[0].get_wallet_rpc('desc_no_priv') | assert_raises_rpc_error( | ||||
assert_raises_rpc_error(-4, | -4, "This wallet has no available keys", nopriv_rpc.getnewaddress | ||||
'This wallet has no available keys', | ) | ||||
nopriv_rpc.getnewaddress) | |||||
if __name__ == '__main__': | if __name__ == "__main__": | ||||
WalletDescriptorTest().main() | WalletDescriptorTest().main() |