Changeset View
Changeset View
Standalone View
Standalone View
test/functional/rpc_createmultisig.py
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||||
# Copyright (c) 2015-2019 The Bitcoin Core developers | # Copyright (c) 2015-2019 The Bitcoin Core developers | ||||
# Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
"""Test multisig RPCs""" | """Test multisig RPCs""" | ||||
import binascii | import binascii | ||||
import decimal | import decimal | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import os | import os | ||||
from test_framework.authproxy import JSONRPCException | |||||
from test_framework.descriptors import descsum_create, drop_origins | from test_framework.descriptors import descsum_create, drop_origins | ||||
from test_framework.key import ECPubKey | from test_framework.key import ECPubKey, ECKey | ||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
from test_framework.util import ( | from test_framework.util import ( | ||||
assert_raises_rpc_error, | assert_raises_rpc_error, | ||||
assert_equal, | assert_equal, | ||||
) | ) | ||||
from test_framework.wallet_util import bytes_to_wif | |||||
class RpcCreateMultiSigTest(BitcoinTestFramework): | class RpcCreateMultiSigTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.setup_clean_chain = True | self.setup_clean_chain = True | ||||
self.num_nodes = 3 | self.num_nodes = 3 | ||||
self.supports_cli = False | self.supports_cli = False | ||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_wallet() | self.skip_if_no_wallet() | ||||
def get_keys(self): | def get_keys(self): | ||||
self.pub = [] | |||||
self.priv = [] | |||||
node0, node1, node2 = self.nodes | node0, node1, node2 = self.nodes | ||||
add = [node1.getnewaddress() for _ in range(self.nkeys)] | for _ in range(self.nkeys): | ||||
self.pub = [node1.getaddressinfo(a)["pubkey"] for a in add] | k = ECKey() | ||||
self.priv = [node1.dumpprivkey(a) for a in add] | k.generate() | ||||
self.pub.append(k.get_pubkey().get_bytes().hex()) | |||||
self.priv.append(bytes_to_wif(k.get_bytes(), k.is_compressed)) | |||||
self.final = node2.getnewaddress() | self.final = node2.getnewaddress() | ||||
def run_test(self): | def run_test(self): | ||||
node0, node1, node2 = self.nodes | node0, node1, node2 = self.nodes | ||||
self.check_addmultisigaddress_errors() | self.check_addmultisigaddress_errors() | ||||
self.log.info('Generating blocks ...') | self.log.info('Generating blocks ...') | ||||
Show All 16 Lines | # Test mixed compressed and uncompressed pubkeys | ||||
pk2 = node2.getaddressinfo(node2.getnewaddress())['pubkey'] | pk2 = node2.getaddressinfo(node2.getnewaddress())['pubkey'] | ||||
# decompress pk2 | # decompress pk2 | ||||
pk_obj = ECPubKey() | pk_obj = ECPubKey() | ||||
pk_obj.set(binascii.unhexlify(pk2)) | pk_obj.set(binascii.unhexlify(pk2)) | ||||
pk_obj.compressed = False | pk_obj.compressed = False | ||||
pk2 = binascii.hexlify(pk_obj.get_bytes()).decode() | pk2 = binascii.hexlify(pk_obj.get_bytes()).decode() | ||||
node0.createwallet(wallet_name='wmulti0', disable_private_keys=True) | |||||
wmulti0 = node0.get_wallet_rpc('wmulti0') | |||||
# Check all permutations of keys because order matters apparently | # Check all permutations of keys because order matters apparently | ||||
for keys in itertools.permutations([pk0, pk1, pk2]): | for keys in itertools.permutations([pk0, pk1, pk2]): | ||||
# Results should be the same as this legacy one | # Results should be the same as this legacy one | ||||
legacy_addr = node0.createmultisig(2, keys)['address'] | legacy_addr = wmulti0.createmultisig(2, keys)['address'] | ||||
assert_equal( | assert_equal( | ||||
legacy_addr, node0.addmultisigaddress( | legacy_addr, wmulti0.addmultisigaddress( | ||||
2, keys, '')['address']) | 2, keys, '')['address']) | ||||
# Generate addresses with the segwit types. These should all make | # Generate addresses with the segwit types. These should all make | ||||
# legacy addresses | # legacy addresses | ||||
assert_equal(legacy_addr, node0.createmultisig(2, keys)['address']) | assert_equal( | ||||
legacy_addr, wmulti0.createmultisig( | |||||
2, keys)['address']) | |||||
self.log.info( | self.log.info( | ||||
'Testing sortedmulti descriptors with BIP 67 test vectors') | 'Testing sortedmulti descriptors with BIP 67 test vectors') | ||||
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/rpc_bip67.json'), encoding='utf-8') as f: | with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data/rpc_bip67.json'), encoding='utf-8') as f: | ||||
vectors = json.load(f) | vectors = json.load(f) | ||||
for t in vectors: | for t in vectors: | ||||
key_str = ','.join(t['keys']) | key_str = ','.join(t['keys']) | ||||
desc = descsum_create('sh(sortedmulti(2,{}))'.format(key_str)) | desc = descsum_create('sh(sortedmulti(2,{}))'.format(key_str)) | ||||
assert_equal(self.nodes[0].deriveaddresses(desc)[0], t['address']) | assert_equal(self.nodes[0].deriveaddresses(desc)[0], t['address']) | ||||
sorted_key_str = ','.join(t['sorted_keys']) | sorted_key_str = ','.join(t['sorted_keys']) | ||||
sorted_key_desc = descsum_create( | sorted_key_desc = descsum_create( | ||||
'sh(multi(2,{}))'.format(sorted_key_str)) | 'sh(multi(2,{}))'.format(sorted_key_str)) | ||||
assert_equal(self.nodes[0].deriveaddresses( | assert_equal(self.nodes[0].deriveaddresses( | ||||
sorted_key_desc)[0], t['address']) | sorted_key_desc)[0], t['address']) | ||||
def check_addmultisigaddress_errors(self): | def check_addmultisigaddress_errors(self): | ||||
if self.options.descriptors: | |||||
return | |||||
self.log.info( | self.log.info( | ||||
'Check that addmultisigaddress fails when the private keys are missing') | 'Check that addmultisigaddress fails when the private keys are missing') | ||||
addresses = [self.nodes[1].getnewaddress( | addresses = [self.nodes[1].getnewaddress( | ||||
address_type='legacy') for _ in range(2)] | address_type='legacy') for _ in range(2)] | ||||
assert_raises_rpc_error(-5, | assert_raises_rpc_error(-5, | ||||
'no full public key for address', | 'no full public key for address', | ||||
lambda: self.nodes[0].addmultisigaddress(nrequired=1, | lambda: self.nodes[0].addmultisigaddress(nrequired=1, | ||||
keys=addresses)) | keys=addresses)) | ||||
Show All 18 Lines | def checkbalances(self): | ||||
assert 150 < height < 350 | assert 150 < height < 350 | ||||
total = 149 * 50000000 + (height - 149 - 100) * 25000000 | total = 149 * 50000000 + (height - 149 - 100) * 25000000 | ||||
assert bal1 == 0 | assert bal1 == 0 | ||||
assert bal2 == self.moved | assert bal2 == self.moved | ||||
assert bal0 + bal1 + bal2 == total | assert bal0 + bal1 + bal2 == total | ||||
def do_multisig(self): | def do_multisig(self): | ||||
node0, node1, node2 = self.nodes | node0, node1, node2 = self.nodes | ||||
if 'wmulti' not in node1.listwallets(): | |||||
try: | |||||
node1.loadwallet('wmulti') | |||||
except JSONRPCException as e: | |||||
if e.error['code'] == - \ | |||||
18 and 'Wallet wmulti not found' in e.error['message']: | |||||
node1.createwallet( | |||||
wallet_name='wmulti', | |||||
disable_private_keys=True) | |||||
else: | |||||
raise | |||||
wmulti = node1.get_wallet_rpc('wmulti') | |||||
# Construct the expected descriptor | # Construct the expected descriptor | ||||
desc = 'multi({},{})'.format(self.nsigs, ','.join(self.pub)) | desc = 'multi({},{})'.format(self.nsigs, ','.join(self.pub)) | ||||
desc = 'sh({})'.format(desc) | desc = 'sh({})'.format(desc) | ||||
desc = descsum_create(desc) | desc = descsum_create(desc) | ||||
msig = node2.createmultisig(self.nsigs, self.pub) | msig = node2.createmultisig(self.nsigs, self.pub) | ||||
madd = msig["address"] | madd = msig["address"] | ||||
mredeem = msig["redeemScript"] | mredeem = msig["redeemScript"] | ||||
assert_equal(desc, msig['descriptor']) | assert_equal(desc, msig['descriptor']) | ||||
# compare against addmultisigaddress | # compare against addmultisigaddress | ||||
msigw = node1.addmultisigaddress(self.nsigs, self.pub, None) | msigw = wmulti.addmultisigaddress(self.nsigs, self.pub, None) | ||||
maddw = msigw["address"] | maddw = msigw["address"] | ||||
mredeemw = msigw["redeemScript"] | mredeemw = msigw["redeemScript"] | ||||
assert_equal(desc, drop_origins(msigw['descriptor'])) | assert_equal(desc, drop_origins(msigw['descriptor'])) | ||||
# addmultisigiaddress and createmultisig work the same | # addmultisigiaddress and createmultisig work the same | ||||
assert maddw == madd | assert maddw == madd | ||||
assert mredeemw == mredeem | assert mredeemw == mredeem | ||||
txid = node0.sendtoaddress(madd, 40000000) | txid = node0.sendtoaddress(madd, 40000000) | ||||
Show All 23 Lines | def do_multisig(self): | ||||
tx = node0.sendrawtransaction(rawtx3["hex"], 0) | tx = node0.sendrawtransaction(rawtx3["hex"], 0) | ||||
blk = node0.generate(1)[0] | blk = node0.generate(1)[0] | ||||
assert tx in node0.getblock(blk)["tx"] | assert tx in node0.getblock(blk)["tx"] | ||||
txinfo = node0.getrawtransaction(tx, True, blk) | txinfo = node0.getrawtransaction(tx, True, blk) | ||||
self.log.info("n/m={}/{} size={}".format(self.nsigs, | self.log.info("n/m={}/{} size={}".format(self.nsigs, | ||||
self.nkeys, txinfo["size"])) | self.nkeys, txinfo["size"])) | ||||
wmulti.unloadwallet() | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
RpcCreateMultiSigTest().main() | RpcCreateMultiSigTest().main() |