Page MenuHomePhabricator

No OneTemporary

diff --git a/test/functional/rpc_bind.py b/test/functional/rpc_bind.py
index 14a343ad2e..b8b5264fb5 100755
--- a/test/functional/rpc_bind.py
+++ b/test/functional/rpc_bind.py
@@ -1,134 +1,143 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
# Test for -rpcbind, as well as -rpcallowip and -rpcconnect
from platform import uname
+import socket
+import sys
+from test_framework.netutil import addr_to_hex, all_interfaces, get_bind_addrs
from test_framework.test_framework import BitcoinTestFramework, SkipTest
-from test_framework.util import *
-from test_framework.netutil import *
+from test_framework.util import (
+ assert_equal,
+ assert_raises_rpc_error,
+ get_datadir_path,
+ get_rpc_proxy,
+ rpc_port,
+ rpc_url,
+)
class RPCBindTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def setup_network(self):
self.add_nodes(self.num_nodes, None)
def run_bind_test(self, allow_ips, connect_to, addresses, expected):
'''
Start a node with requested rpcallowip and rpcbind parameters,
then try to connect, and check if the set of bound addresses
matches the expected set.
'''
self.log.info("Bind test for {}".format(str(addresses)))
expected = [(addr_to_hex(addr), port) for (addr, port) in expected]
base_args = ['-disablewallet', '-nolisten']
if allow_ips:
base_args += ['-rpcallowip=' + x for x in allow_ips]
binds = ['-rpcbind=' + addr for addr in addresses]
parts = connect_to.split(':')
if len(parts) == 2:
self.nodes[0].host = parts[0]
self.nodes[0].rpc_port = parts[1]
else:
self.nodes[0].host = connect_to
self.nodes[0].rpc_port = rpc_port(self.nodes[0].index)
self.start_node(0, base_args + binds)
pid = self.nodes[0].process.pid
assert_equal(set(get_bind_addrs(pid)), set(expected))
self.stop_nodes()
def run_allowip_test(self, allow_ips, rpchost, rpcport):
'''
Start a node with rpcallow IP, and request getnetworkinfo
at a non-localhost IP.
'''
self.log.info("Allow IP test for {}:{}".format(rpchost, rpcport))
base_args = ['-disablewallet', '-nolisten'] + \
['-rpcallowip=' + x for x in allow_ips]
self.nodes[0].host = None
self.start_nodes([base_args])
# connect to node through non-loopback interface
url = rpc_url(get_datadir_path(self.options.tmpdir, 0),
rpchost, rpcport)
node = get_rpc_proxy(url, 0, coveragedir=self.options.coveragedir)
node.getnetworkinfo()
self.stop_nodes()
def run_test(self):
# due to OS-specific network stats queries, this test works only on Linux
if not sys.platform.startswith('linux'):
raise SkipTest("This test can only be run on linux.")
# WSL in currently not supported (refer to
# https://reviews.bitcoinabc.org/T400 for details).
# This condition should be removed once netstat support is provided by
# Microsoft.
if "microsoft" in uname().version.lower():
raise SkipTest(
"Running this test on WSL is currently not supported")
# find the first non-loopback interface for testing
non_loopback_ip = None
for name, ip in all_interfaces():
if ip != '127.0.0.1':
non_loopback_ip = ip
break
if non_loopback_ip is None:
raise SkipTest(
"This test requires at least one non-loopback IPv4 interface.")
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
s.connect(("::1", 1))
s.close
except OSError:
raise SkipTest("This test requires IPv6 support.")
self.log.info("Using interface {} for testing".format(non_loopback_ip))
defaultport = rpc_port(0)
# check default without rpcallowip (IPv4 and IPv6 localhost)
self.run_bind_test(None, '127.0.0.1', [],
[('127.0.0.1', defaultport), ('::1', defaultport)])
# check default with rpcallowip (IPv6 any)
self.run_bind_test(['127.0.0.1'], '127.0.0.1', [],
[('::0', defaultport)])
# check only IPv4 localhost (explicit)
self.run_bind_test(['127.0.0.1'], '127.0.0.1', ['127.0.0.1'],
[('127.0.0.1', defaultport)])
# check only IPv4 localhost (explicit) with alternative port
self.run_bind_test(
['127.0.0.1'], '127.0.0.1:32171', ['127.0.0.1:32171'],
[('127.0.0.1', 32171)])
# check only IPv4 localhost (explicit) with multiple alternative ports
# on same host
self.run_bind_test(
['127.0.0.1'], '127.0.0.1:32171', [
'127.0.0.1:32171', '127.0.0.1:32172'],
[('127.0.0.1', 32171), ('127.0.0.1', 32172)])
# check only IPv6 localhost (explicit)
self.run_bind_test(['[::1]'], '[::1]', ['[::1]'],
[('::1', defaultport)])
# check both IPv4 and IPv6 localhost (explicit)
self.run_bind_test(['127.0.0.1'], '127.0.0.1', ['127.0.0.1', '[::1]'],
[('127.0.0.1', defaultport), ('::1', defaultport)])
# check only non-loopback interface
self.run_bind_test(
[non_loopback_ip], non_loopback_ip, [non_loopback_ip],
[(non_loopback_ip, defaultport)])
# Check that with invalid rpcallowip, we are denied
self.run_allowip_test([non_loopback_ip], non_loopback_ip, defaultport)
assert_raises_rpc_error(-342, "non-JSON HTTP response with '403 Forbidden' from server",
self.run_allowip_test, ['1.1.1.1'], non_loopback_ip, defaultport)
if __name__ == '__main__':
RPCBindTest().main()
diff --git a/test/functional/rpc_blockchain.py b/test/functional/rpc_blockchain.py
index a50f3185dc..4e00ec1bff 100755
--- a/test/functional/rpc_blockchain.py
+++ b/test/functional/rpc_blockchain.py
@@ -1,243 +1,243 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test RPCs related to blockchainstate.
Test the following RPCs:
- gettxoutsetinfo
- getdifficulty
- getbestblockhash
- getblockhash
- getblockheader
- getchaintxstats
- getnetworkhashps
- verifychain
Tests correspond to code in rpc/blockchain.cpp.
"""
from decimal import Decimal
import http.client
import subprocess
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_raises,
assert_raises_rpc_error,
- assert_is_hex_string,
assert_is_hash_string,
+ assert_is_hex_string,
)
class BlockchainTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [['-stopatheight=207', '-prune=1']]
def run_test(self):
self._test_getblockchaininfo()
self._test_getchaintxstats()
self._test_gettxoutsetinfo()
self._test_getblockheader()
self._test_getdifficulty()
self._test_getnetworkhashps()
self._test_stopatheight()
self._test_getblock()
assert self.nodes[0].verifychain(4, 0)
def _test_getblockchaininfo(self):
self.log.info("Test getblockchaininfo")
keys = [
'bestblockhash',
'blocks',
'chain',
'chainwork',
'difficulty',
'headers',
'mediantime',
'pruned',
'softforks',
'verificationprogress',
]
res = self.nodes[0].getblockchaininfo()
# result should have pruneheight and default keys if pruning is enabled
assert_equal(sorted(res.keys()), sorted(['pruneheight'] + keys))
# pruneheight should be greater or equal to 0
assert res['pruneheight'] >= 0
self.restart_node(0, ['-stopatheight=207'])
res = self.nodes[0].getblockchaininfo()
# should have exact keys
assert_equal(sorted(res.keys()), keys)
def _test_getchaintxstats(self):
chaintxstats = self.nodes[0].getchaintxstats(1)
# 200 txs plus genesis tx
assert_equal(chaintxstats['txcount'], 201)
# tx rate should be 1 per 10 minutes, or 1/600
# we have to round because of binary math
assert_equal(round(chaintxstats['txrate'] * 600, 10), Decimal(1))
b1 = self.nodes[0].getblock(self.nodes[0].getblockhash(1))
b200 = self.nodes[0].getblock(self.nodes[0].getblockhash(200))
time_diff = b200['mediantime'] - b1['mediantime']
chaintxstats = self.nodes[0].getchaintxstats()
assert_equal(chaintxstats['time'], b200['time'])
assert_equal(chaintxstats['txcount'], 201)
assert_equal(chaintxstats['window_block_count'], 199)
assert_equal(chaintxstats['window_tx_count'], 199)
assert_equal(chaintxstats['window_interval'], time_diff)
assert_equal(
round(chaintxstats['txrate'] * time_diff, 10), Decimal(199))
chaintxstats = self.nodes[0].getchaintxstats(blockhash=b1['hash'])
assert_equal(chaintxstats['time'], b1['time'])
assert_equal(chaintxstats['txcount'], 2)
assert_equal(chaintxstats['window_block_count'], 0)
assert('window_tx_count' not in chaintxstats)
assert('window_interval' not in chaintxstats)
assert('txrate' not in chaintxstats)
assert_raises_rpc_error(
-8, "Invalid block count: should be between 0 and the block's height - 1",
self.nodes[0].getchaintxstats, 201)
def _test_gettxoutsetinfo(self):
node = self.nodes[0]
res = node.gettxoutsetinfo()
assert_equal(res['total_amount'], Decimal('8725.00000000'))
assert_equal(res['transactions'], 200)
assert_equal(res['height'], 200)
assert_equal(res['txouts'], 200)
assert_equal(res['bogosize'], 17000),
assert_equal(res['bestblock'], node.getblockhash(200))
size = res['disk_size']
assert size > 6400
assert size < 64000
assert_equal(len(res['bestblock']), 64)
assert_equal(len(res['hash_serialized']), 64)
self.log.info(
"Test that gettxoutsetinfo() works for blockchain with just the genesis block")
b1hash = node.getblockhash(1)
node.invalidateblock(b1hash)
res2 = node.gettxoutsetinfo()
assert_equal(res2['transactions'], 0)
assert_equal(res2['total_amount'], Decimal('0'))
assert_equal(res2['height'], 0)
assert_equal(res2['txouts'], 0)
assert_equal(res2['bogosize'], 0),
assert_equal(res2['bestblock'], node.getblockhash(0))
assert_equal(len(res2['hash_serialized']), 64)
self.log.info(
"Test that gettxoutsetinfo() returns the same result after invalidate/reconsider block")
node.reconsiderblock(b1hash)
res3 = node.gettxoutsetinfo()
assert_equal(res['total_amount'], res3['total_amount'])
assert_equal(res['transactions'], res3['transactions'])
assert_equal(res['height'], res3['height'])
assert_equal(res['txouts'], res3['txouts'])
assert_equal(res['bogosize'], res3['bogosize'])
assert_equal(res['bestblock'], res3['bestblock'])
assert_equal(res['hash_serialized'], res3['hash_serialized'])
def _test_getblockheader(self):
node = self.nodes[0]
assert_raises_rpc_error(-5, "Block not found",
node.getblockheader, "nonsense")
besthash = node.getbestblockhash()
secondbesthash = node.getblockhash(199)
header = node.getblockheader(besthash)
assert_equal(header['hash'], besthash)
assert_equal(header['height'], 200)
assert_equal(header['confirmations'], 1)
assert_equal(header['previousblockhash'], secondbesthash)
assert_is_hex_string(header['chainwork'])
assert_is_hash_string(header['hash'])
assert_is_hash_string(header['previousblockhash'])
assert_is_hash_string(header['merkleroot'])
assert_is_hash_string(header['bits'], length=None)
assert isinstance(header['time'], int)
assert isinstance(header['mediantime'], int)
assert isinstance(header['nonce'], int)
assert isinstance(header['version'], int)
assert isinstance(int(header['versionHex'], 16), int)
assert isinstance(header['difficulty'], Decimal)
def _test_getdifficulty(self):
difficulty = self.nodes[0].getdifficulty()
# 1 hash in 2 should be valid, so difficulty should be 1/2**31
# binary => decimal => binary math is why we do this check
assert abs(difficulty * 2**31 - 1) < 0.0001
def _test_getnetworkhashps(self):
hashes_per_second = self.nodes[0].getnetworkhashps()
# This should be 2 hashes every 10 minutes or 1/300
assert abs(hashes_per_second * 300 - 1) < 0.0001
def _test_stopatheight(self):
assert_equal(self.nodes[0].getblockcount(), 200)
self.nodes[0].generate(6)
assert_equal(self.nodes[0].getblockcount(), 206)
self.log.debug('Node should not stop at this height')
assert_raises(subprocess.TimeoutExpired,
lambda: self.nodes[0].process.wait(timeout=3))
try:
self.nodes[0].generate(1)
except (ConnectionError, http.client.BadStatusLine):
pass # The node already shut down before response
self.log.debug('Node should stop at this height...')
self.nodes[0].wait_until_stopped()
self.start_node(0)
assert_equal(self.nodes[0].getblockcount(), 207)
def _test_getblock(self):
# Checks for getblock verbose outputs
node = self.nodes[0]
getblockinfo = node.getblock(node.getblockhash(1), 2)
gettransactioninfo = node.gettransaction(getblockinfo['tx'][0]['txid'])
getblockheaderinfo = node.getblockheader(node.getblockhash(1), True)
assert_equal(getblockinfo['hash'], gettransactioninfo['blockhash'])
assert_equal(
getblockinfo['confirmations'], gettransactioninfo['confirmations'])
assert_equal(getblockinfo['height'], getblockheaderinfo['height'])
assert_equal(
getblockinfo['versionHex'], getblockheaderinfo['versionHex'])
assert_equal(getblockinfo['version'], getblockheaderinfo['version'])
assert_equal(getblockinfo['size'], 188)
assert_equal(
getblockinfo['merkleroot'], getblockheaderinfo['merkleroot'])
# Verify transaction data by check the hex values
for tx in getblockinfo['tx']:
getrawtransaction = node.getrawtransaction(tx['txid'], True)
assert_equal(tx['hex'], getrawtransaction['hex'])
assert_equal(getblockinfo['time'], getblockheaderinfo['time'])
assert_equal(
getblockinfo['mediantime'], getblockheaderinfo['mediantime'])
assert_equal(getblockinfo['nonce'], getblockheaderinfo['nonce'])
assert_equal(getblockinfo['bits'], getblockheaderinfo['bits'])
assert_equal(
getblockinfo['difficulty'], getblockheaderinfo['difficulty'])
assert_equal(
getblockinfo['chainwork'], getblockheaderinfo['chainwork'])
assert_equal(
getblockinfo['previousblockhash'], getblockheaderinfo['previousblockhash'])
assert_equal(
getblockinfo['nextblockhash'], getblockheaderinfo['nextblockhash'])
if __name__ == '__main__':
BlockchainTest().main()
diff --git a/test/functional/rpc_decodescript.py b/test/functional/rpc_decodescript.py
index 22ffb2c839..65240ef932 100755
--- a/test/functional/rpc_decodescript.py
+++ b/test/functional/rpc_decodescript.py
@@ -1,216 +1,216 @@
#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
-from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import *
from test_framework.messages import CTransaction, FromHex, ToHex
+from test_framework.test_framework import BitcoinTestFramework
+from test_framework.util import assert_equal, bytes_to_hex_str, hex_str_to_bytes
class DecodeScriptTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def decodescript_script_sig(self):
signature = '304502207fa7a6d1e0ee81132a269ad84e68d695483745cde8b541e3bf630749894e342a022100c1f7ab20e13e22fb95281a870f3dcf38d782e53023ee313d741ad0cfbc0c509001'
push_signature = '48' + signature
public_key = '03b0da749730dc9b4b1f4a14d6902877a92541f5368778853d9c4a0cb7802dcfb2'
push_public_key = '21' + public_key
# below are test cases for all of the standard transaction types
# 1) P2PK scriptSig
# the scriptSig of a public key scriptPubKey simply pushes a signature
# onto the stack
rpc_result = self.nodes[0].decodescript(push_signature)
assert_equal(signature, rpc_result['asm'])
# 2) P2PKH scriptSig
rpc_result = self.nodes[0].decodescript(
push_signature + push_public_key)
assert_equal(signature + ' ' + public_key, rpc_result['asm'])
# 3) multisig scriptSig
# this also tests the leading portion of a P2SH multisig scriptSig
# OP_0 <A sig> <B sig>
rpc_result = self.nodes[0].decodescript(
'00' + push_signature + push_signature)
assert_equal('0 ' + signature + ' ' + signature, rpc_result['asm'])
# 4) P2SH scriptSig
# an empty P2SH redeemScript is valid and makes for a very simple test case.
# thus, such a spending scriptSig would just need to pass the outer redeemScript
# hash test and leave true on the top of the stack.
rpc_result = self.nodes[0].decodescript('5100')
assert_equal('1 0', rpc_result['asm'])
# 5) null data scriptSig - no such thing because null data scripts can not be spent.
# thus, no test case for that standard transaction type is here.
def decodescript_script_pub_key(self):
public_key = '03b0da749730dc9b4b1f4a14d6902877a92541f5368778853d9c4a0cb7802dcfb2'
push_public_key = '21' + public_key
public_key_hash = '11695b6cd891484c2d49ec5aa738ec2b2f897777'
push_public_key_hash = '14' + public_key_hash
# below are test cases for all of the standard transaction types
# 1) P2PK scriptPubKey
# <pubkey> OP_CHECKSIG
rpc_result = self.nodes[0].decodescript(push_public_key + 'ac')
assert_equal(public_key + ' OP_CHECKSIG', rpc_result['asm'])
# 2) P2PKH scriptPubKey
# OP_DUP OP_HASH160 <PubKeyHash> OP_EQUALVERIFY OP_CHECKSIG
rpc_result = self.nodes[0].decodescript(
'76a9' + push_public_key_hash + '88ac')
assert_equal('OP_DUP OP_HASH160 ' + public_key_hash +
' OP_EQUALVERIFY OP_CHECKSIG', rpc_result['asm'])
# 3) multisig scriptPubKey
# <m> <A pubkey> <B pubkey> <C pubkey> <n> OP_CHECKMULTISIG
# just imagine that the pub keys used below are different.
# for our purposes here it does not matter that they are the same even
# though it is unrealistic.
rpc_result = self.nodes[0].decodescript(
'52' + push_public_key + push_public_key + push_public_key + '53ae')
assert_equal('2 ' + public_key + ' ' + public_key + ' ' +
public_key + ' 3 OP_CHECKMULTISIG', rpc_result['asm'])
# 4) P2SH scriptPubKey
# OP_HASH160 <Hash160(redeemScript)> OP_EQUAL.
# push_public_key_hash here should actually be the hash of a redeem script.
# but this works the same for purposes of this test.
rpc_result = self.nodes[0].decodescript(
'a9' + push_public_key_hash + '87')
assert_equal(
'OP_HASH160 ' + public_key_hash + ' OP_EQUAL', rpc_result['asm'])
# 5) null data scriptPubKey
# use a signature look-alike here to make sure that we do not decode random data as a signature.
# this matters if/when signature sighash decoding comes along.
# would want to make sure that no such decoding takes place in this
# case.
signature_imposter = '48304502207fa7a6d1e0ee81132a269ad84e68d695483745cde8b541e3bf630749894e342a022100c1f7ab20e13e22fb95281a870f3dcf38d782e53023ee313d741ad0cfbc0c509001'
# OP_RETURN <data>
rpc_result = self.nodes[0].decodescript('6a' + signature_imposter)
assert_equal('OP_RETURN ' + signature_imposter[2:], rpc_result['asm'])
# 6) a CLTV redeem script. redeem scripts are in-effect scriptPubKey scripts, so adding a test here.
# OP_NOP2 is also known as OP_CHECKLOCKTIMEVERIFY.
# just imagine that the pub keys used below are different.
# for our purposes here it does not matter that they are the same even though it is unrealistic.
#
# OP_IF
# <receiver-pubkey> OP_CHECKSIGVERIFY
# OP_ELSE
# <lock-until> OP_CHECKLOCKTIMEVERIFY OP_DROP
# OP_ENDIF
# <sender-pubkey> OP_CHECKSIG
#
# lock until block 500,000
rpc_result = self.nodes[0].decodescript(
'63' + push_public_key + 'ad670320a107b17568' + push_public_key + 'ac')
assert_equal('OP_IF ' + public_key + ' OP_CHECKSIGVERIFY OP_ELSE 500000 OP_CHECKLOCKTIMEVERIFY OP_DROP OP_ENDIF ' +
public_key + ' OP_CHECKSIG', rpc_result['asm'])
def decoderawtransaction_asm_sighashtype(self):
"""Tests decoding scripts via RPC command "decoderawtransaction".
This test is in with the "decodescript" tests because they are testing the same "asm" script decodes.
"""
# this test case uses a random plain vanilla mainnet transaction with a
# single P2PKH input and output
tx = '0100000001696a20784a2c70143f634e95227dbdfdf0ecd51647052e70854512235f5986ca010000008a47304402207174775824bec6c2700023309a168231ec80b82c6069282f5133e6f11cbb04460220570edc55c7c5da2ca687ebd0372d3546ebc3f810516a002350cac72dfe192dfb014104d3f898e6487787910a690410b7a917ef198905c27fb9d3b0a42da12aceae0544fc7088d239d9a48f2828a15a09e84043001f27cc80d162cb95404e1210161536ffffffff0100e1f505000000001976a914eb6c6e0cdb2d256a32d97b8df1fc75d1920d9bca88ac00000000'
rpc_result = self.nodes[0].decoderawtransaction(tx)
assert_equal(
'304402207174775824bec6c2700023309a168231ec80b82c6069282f5133e6f11cbb04460220570edc55c7c5da2ca687ebd0372d3546ebc3f810516a002350cac72dfe192dfb[ALL] 04d3f898e6487787910a690410b7a917ef198905c27fb9d3b0a42da12aceae0544fc7088d239d9a48f2828a15a09e84043001f27cc80d162cb95404e1210161536', rpc_result['vin'][0]['scriptSig']['asm'])
# this test case uses a mainnet transaction that has a P2SH input and both P2PKH and P2SH outputs.
# it's from James D'Angelo's awesome introductory videos about multisig: https://www.youtube.com/watch?v=zIbUSaZBJgU and https://www.youtube.com/watch?v=OSA1pwlaypc
# verify that we have not altered scriptPubKey decoding.
tx = '01000000018d1f5635abd06e2c7e2ddf58dc85b3de111e4ad6e0ab51bb0dcf5e84126d927300000000fdfe0000483045022100ae3b4e589dfc9d48cb82d41008dc5fa6a86f94d5c54f9935531924602730ab8002202f88cf464414c4ed9fa11b773c5ee944f66e9b05cc1e51d97abc22ce098937ea01483045022100b44883be035600e9328a01b66c7d8439b74db64187e76b99a68f7893b701d5380220225bf286493e4c4adcf928c40f785422572eb232f84a0b83b0dea823c3a19c75014c695221020743d44be989540d27b1b4bbbcfd17721c337cb6bc9af20eb8a32520b393532f2102c0120a1dda9e51a938d39ddd9fe0ebc45ea97e1d27a7cbd671d5431416d3dd87210213820eb3d5f509d7438c9eeecb4157b2f595105e7cd564b3cdbb9ead3da41eed53aeffffffff02611e0000000000001976a914dc863734a218bfe83ef770ee9d41a27f824a6e5688acee2a02000000000017a9142a5edea39971049a540474c6a99edf0aa4074c588700000000'
rpc_result = self.nodes[0].decoderawtransaction(tx)
assert_equal(
'8e3730608c3b0bb5df54f09076e196bc292a8e39a78e73b44b6ba08c78f5cbb0', rpc_result['txid'])
assert_equal(
'0 3045022100ae3b4e589dfc9d48cb82d41008dc5fa6a86f94d5c54f9935531924602730ab8002202f88cf464414c4ed9fa11b773c5ee944f66e9b05cc1e51d97abc22ce098937ea[ALL] 3045022100b44883be035600e9328a01b66c7d8439b74db64187e76b99a68f7893b701d5380220225bf286493e4c4adcf928c40f785422572eb232f84a0b83b0dea823c3a19c75[ALL] 5221020743d44be989540d27b1b4bbbcfd17721c337cb6bc9af20eb8a32520b393532f2102c0120a1dda9e51a938d39ddd9fe0ebc45ea97e1d27a7cbd671d5431416d3dd87210213820eb3d5f509d7438c9eeecb4157b2f595105e7cd564b3cdbb9ead3da41eed53ae', rpc_result['vin'][0]['scriptSig']['asm'])
assert_equal(
'OP_DUP OP_HASH160 dc863734a218bfe83ef770ee9d41a27f824a6e56 OP_EQUALVERIFY OP_CHECKSIG',
rpc_result['vout'][0]['scriptPubKey']['asm'])
assert_equal(
'OP_HASH160 2a5edea39971049a540474c6a99edf0aa4074c58 OP_EQUAL',
rpc_result['vout'][1]['scriptPubKey']['asm'])
txSave = FromHex(CTransaction(), tx)
# make sure that a specifically crafted op_return value will not pass
# all the IsDERSignature checks and then get decoded as a sighash type
tx = '01000000015ded05872fdbda629c7d3d02b194763ce3b9b1535ea884e3c8e765d42e316724020000006b48304502204c10d4064885c42638cbff3585915b322de33762598321145ba033fc796971e2022100bb153ad3baa8b757e30a2175bd32852d2e1cb9080f84d7e32fcdfd667934ef1b012103163c0ff73511ea1743fb5b98384a2ff09dd06949488028fd819f4d83f56264efffffffff0200000000000000000b6a0930060201000201000180380100000000001976a9141cabd296e753837c086da7a45a6c2fe0d49d7b7b88ac00000000'
rpc_result = self.nodes[0].decoderawtransaction(tx)
assert_equal('OP_RETURN 300602010002010001',
rpc_result['vout'][0]['scriptPubKey']['asm'])
# verify that we have not altered scriptPubKey processing even of a
# specially crafted P2PKH pubkeyhash and P2SH redeem script hash that
# is made to pass the der signature checks
tx = '01000000018d1f5635abd06e2c7e2ddf58dc85b3de111e4ad6e0ab51bb0dcf5e84126d927300000000fdfe0000483045022100ae3b4e589dfc9d48cb82d41008dc5fa6a86f94d5c54f9935531924602730ab8002202f88cf464414c4ed9fa11b773c5ee944f66e9b05cc1e51d97abc22ce098937ea01483045022100b44883be035600e9328a01b66c7d8439b74db64187e76b99a68f7893b701d5380220225bf286493e4c4adcf928c40f785422572eb232f84a0b83b0dea823c3a19c75014c695221020743d44be989540d27b1b4bbbcfd17721c337cb6bc9af20eb8a32520b393532f2102c0120a1dda9e51a938d39ddd9fe0ebc45ea97e1d27a7cbd671d5431416d3dd87210213820eb3d5f509d7438c9eeecb4157b2f595105e7cd564b3cdbb9ead3da41eed53aeffffffff02611e0000000000001976a914301102070101010101010102060101010101010188acee2a02000000000017a91430110207010101010101010206010101010101018700000000'
rpc_result = self.nodes[0].decoderawtransaction(tx)
assert_equal(
'OP_DUP OP_HASH160 3011020701010101010101020601010101010101 OP_EQUALVERIFY OP_CHECKSIG',
rpc_result['vout'][0]['scriptPubKey']['asm'])
assert_equal(
'OP_HASH160 3011020701010101010101020601010101010101 OP_EQUAL',
rpc_result['vout'][1]['scriptPubKey']['asm'])
# some more full transaction tests of varying specific scriptSigs. used instead of
# tests in decodescript_script_sig because the decodescript RPC is specifically
# for working on scriptPubKeys (argh!).
push_signature = bytes_to_hex_str(
txSave.vin[0].scriptSig)[2:(0x48 * 2 + 4)]
signature = push_signature[2:]
der_signature = signature[:-2]
signature_sighash_decoded = der_signature + '[ALL]'
signature_2 = der_signature + '82'
push_signature_2 = '48' + signature_2
signature_2_sighash_decoded = der_signature + '[NONE|ANYONECANPAY]'
# 1) P2PK scriptSig
txSave.vin[0].scriptSig = hex_str_to_bytes(push_signature)
rpc_result = self.nodes[0].decoderawtransaction(ToHex(txSave))
assert_equal(signature_sighash_decoded,
rpc_result['vin'][0]['scriptSig']['asm'])
# make sure that the sighash decodes come out correctly for a more
# complex / lesser used case.
txSave.vin[0].scriptSig = hex_str_to_bytes(push_signature_2)
rpc_result = self.nodes[0].decoderawtransaction(ToHex(txSave))
assert_equal(signature_2_sighash_decoded,
rpc_result['vin'][0]['scriptSig']['asm'])
# 2) multisig scriptSig
txSave.vin[0].scriptSig = hex_str_to_bytes(
'00' + push_signature + push_signature_2)
rpc_result = self.nodes[0].decoderawtransaction(ToHex(txSave))
assert_equal('0 ' + signature_sighash_decoded + ' ' +
signature_2_sighash_decoded, rpc_result['vin'][0]['scriptSig']['asm'])
# 3) test a scriptSig that contains more than push operations.
# in fact, it contains an OP_RETURN with data specially crafted to
# cause improper decode if the code does not catch it.
txSave.vin[0].scriptSig = hex_str_to_bytes(
'6a143011020701010101010101020601010101010101')
rpc_result = self.nodes[0].decoderawtransaction(ToHex(txSave))
assert_equal('OP_RETURN 3011020701010101010101020601010101010101',
rpc_result['vin'][0]['scriptSig']['asm'])
def run_test(self):
self.decodescript_script_sig()
self.decodescript_script_pub_key()
self.decoderawtransaction_asm_sighashtype()
if __name__ == '__main__':
DecodeScriptTest().main()
diff --git a/test/functional/rpc_fundrawtransaction.py b/test/functional/rpc_fundrawtransaction.py
index f1f061f58c..c6b7fea367 100755
--- a/test/functional/rpc_fundrawtransaction.py
+++ b/test/functional/rpc_fundrawtransaction.py
@@ -1,782 +1,791 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
+from decimal import Decimal
+
+from test_framework.messages import CTransaction, FromHex
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import *
-from test_framework.mininode import CTransaction, FromHex
+from test_framework.util import (
+ assert_equal,
+ assert_fee_amount,
+ assert_greater_than,
+ assert_greater_than_or_equal,
+ assert_raises_rpc_error,
+ connect_nodes_bi,
+)
def get_unspent(listunspent, amount):
for utx in listunspent:
if utx['amount'] == amount:
return utx
raise AssertionError(
'Could not find unspent with amount={}'.format(amount))
class RawTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 4
self.setup_clean_chain = True
def setup_network(self, split=False):
self.setup_nodes()
connect_nodes_bi(self.nodes[0], self.nodes[1])
connect_nodes_bi(self.nodes[1], self.nodes[2])
connect_nodes_bi(self.nodes[0], self.nodes[2])
connect_nodes_bi(self.nodes[0], self.nodes[3])
def run_test(self):
min_relay_tx_fee = self.nodes[0].getnetworkinfo()['relayfee']
# This test is not meant to test fee estimation and we'd like
# to be sure all txs are sent at a consistent desired feerate
for node in self.nodes:
node.settxfee(min_relay_tx_fee)
# if the fee's positive delta is higher than this value tests will fail,
# neg. delta always fail the tests.
# The size of the signature of every input may be at most 2 bytes larger
# than a minimum sized signature.
# = 2 bytes * minRelayTxFeePerByte
feeTolerance = 2 * min_relay_tx_fee / 1000
self.nodes[2].generate(1)
self.sync_all()
self.nodes[0].generate(121)
self.sync_all()
# ensure that setting changePosition in fundraw with an exact match is handled properly
rawmatch = self.nodes[2].createrawtransaction(
[], {self.nodes[2].getnewaddress(): 50})
rawmatch = self.nodes[2].fundrawtransaction(
rawmatch, {"changePosition": 1, "subtractFeeFromOutputs": [0]})
assert_equal(rawmatch["changepos"], -1)
watchonly_address = self.nodes[0].getnewaddress()
watchonly_pubkey = self.nodes[
0].validateaddress(watchonly_address)["pubkey"]
watchonly_amount = Decimal(200)
self.nodes[3].importpubkey(watchonly_pubkey, "", True)
watchonly_txid = self.nodes[0].sendtoaddress(
watchonly_address, watchonly_amount)
self.nodes[0].sendtoaddress(
self.nodes[3].getnewaddress(), watchonly_amount / 10)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 1.5)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 1.0)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 5.0)
self.nodes[0].generate(1)
self.sync_all()
#
# simple test #
#
inputs = []
outputs = {self.nodes[0].getnewaddress(): 1.0}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
assert(len(dec_tx['vin']) > 0) # test that we have enough inputs
#
# simple test with two coins #
#
inputs = []
outputs = {self.nodes[0].getnewaddress(): 2.2}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
assert(len(dec_tx['vin']) > 0) # test if we have enough inputs
#
# simple test with two coins #
#
inputs = []
outputs = {self.nodes[0].getnewaddress(): 2.6}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
assert(len(dec_tx['vin']) > 0)
assert_equal(dec_tx['vin'][0]['scriptSig']['hex'], '')
#
# simple test with two outputs #
#
inputs = []
outputs = {
self.nodes[0].getnewaddress(): 2.6, self.nodes[1].getnewaddress(): 2.5}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
totalOut = 0
for out in dec_tx['vout']:
totalOut += out['value']
assert(len(dec_tx['vin']) > 0)
assert_equal(dec_tx['vin'][0]['scriptSig']['hex'], '')
#
# test a fundrawtransaction with a VIN greater than the required amount #
#
utx = get_unspent(self.nodes[2].listunspent(), 5)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']}]
outputs = {self.nodes[0].getnewaddress(): 1.0}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
totalOut = 0
for out in dec_tx['vout']:
totalOut += out['value']
# compare vin total and totalout+fee
assert_equal(fee + totalOut, utx['amount'])
#
# test a fundrawtransaction with which will not get a change output #
#
utx = get_unspent(self.nodes[2].listunspent(), 5)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']}]
outputs = {
self.nodes[0].getnewaddress(): Decimal(5.0) - fee - feeTolerance}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
totalOut = 0
for out in dec_tx['vout']:
totalOut += out['value']
assert_equal(rawtxfund['changepos'], -1)
assert_equal(fee + totalOut, utx['amount'])
# compare vin total and totalout+fee
#
# test a fundrawtransaction with an invalid option #
#
utx = get_unspent(self.nodes[2].listunspent(), 5)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']}]
outputs = {self.nodes[0].getnewaddress(): Decimal(4.0)}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
assert_raises_rpc_error(-3, "Unexpected key foo", self.nodes[
2].fundrawtransaction, rawTx, {'foo': 'bar'})
#
# test a fundrawtransaction with an invalid change address #
#
utx = get_unspent(self.nodes[2].listunspent(), 5)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']}]
outputs = {self.nodes[0].getnewaddress(): Decimal(4.0)}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
assert_raises_rpc_error(
-5, "changeAddress must be a valid bitcoin address",
self.nodes[2].fundrawtransaction, rawTx, {'changeAddress': 'foobar'})
#
# test a fundrawtransaction with a provided change address #
#
utx = get_unspent(self.nodes[2].listunspent(), 5)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']}]
outputs = {self.nodes[0].getnewaddress(): Decimal(4.0)}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
change = self.nodes[2].getnewaddress()
assert_raises_rpc_error(-8, "changePosition out of bounds", self.nodes[
2].fundrawtransaction, rawTx, {'changeAddress': change, 'changePosition': 2})
rawtxfund = self.nodes[2].fundrawtransaction(
rawTx, {'changeAddress': change, 'changePosition': 0})
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
out = dec_tx['vout'][0]
assert_equal(change, out['scriptPubKey']['addresses'][0])
#
# test a fundrawtransaction with a VIN smaller than the required amount #
#
utx = get_unspent(self.nodes[2].listunspent(), 1)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']}]
outputs = {self.nodes[0].getnewaddress(): 1.0}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
# 4-byte version + 1-byte vin count + 36-byte prevout then script_len
rawTx = rawTx[:82] + "0100" + rawTx[84:]
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
assert_equal("00", dec_tx['vin'][0]['scriptSig']['hex'])
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
totalOut = 0
matchingOuts = 0
for i, out in enumerate(dec_tx['vout']):
totalOut += out['value']
if out['scriptPubKey']['addresses'][0] in outputs:
matchingOuts += 1
else:
assert_equal(i, rawtxfund['changepos'])
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
assert_equal("00", dec_tx['vin'][0]['scriptSig']['hex'])
assert_equal(matchingOuts, 1)
assert_equal(len(dec_tx['vout']), 2)
#
# test a fundrawtransaction with two VINs #
#
utx = get_unspent(self.nodes[2].listunspent(), 1)
utx2 = get_unspent(self.nodes[2].listunspent(), 5)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']},
{'txid': utx2['txid'], 'vout': utx2['vout']}]
outputs = {self.nodes[0].getnewaddress(): 6.0}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
totalOut = 0
matchingOuts = 0
for out in dec_tx['vout']:
totalOut += out['value']
if out['scriptPubKey']['addresses'][0] in outputs:
matchingOuts += 1
assert_equal(matchingOuts, 1)
assert_equal(len(dec_tx['vout']), 2)
matchingIns = 0
for vinOut in dec_tx['vin']:
for vinIn in inputs:
if vinIn['txid'] == vinOut['txid']:
matchingIns += 1
# we now must see two vins identical to vins given as params
assert_equal(matchingIns, 2)
#
# test a fundrawtransaction with two VINs and two vOUTs #
#
utx = get_unspent(self.nodes[2].listunspent(), 1)
utx2 = get_unspent(self.nodes[2].listunspent(), 5)
inputs = [{'txid': utx['txid'], 'vout': utx['vout']},
{'txid': utx2['txid'], 'vout': utx2['vout']}]
outputs = {
self.nodes[0].getnewaddress(): 6.0, self.nodes[0].getnewaddress(): 1.0}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(utx['txid'], dec_tx['vin'][0]['txid'])
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
fee = rawtxfund['fee']
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
totalOut = 0
matchingOuts = 0
for out in dec_tx['vout']:
totalOut += out['value']
if out['scriptPubKey']['addresses'][0] in outputs:
matchingOuts += 1
assert_equal(matchingOuts, 2)
assert_equal(len(dec_tx['vout']), 3)
#
# test a fundrawtransaction with invalid vin #
#
inputs = [
{'txid': "1c7f966dab21119bac53213a2bc7532bff1fa844c124fd750a7d0b1332440bd1", 'vout': 0}]
# invalid vin!
outputs = {self.nodes[0].getnewaddress(): 1.0}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_raises_rpc_error(
-4, "Insufficient funds", self.nodes[2].fundrawtransaction, rawTx)
#
# compare fee of a standard pubkeyhash transaction
inputs = []
outputs = {self.nodes[1].getnewaddress(): 1.1}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[0].fundrawtransaction(rawTx)
# create same transaction over sendtoaddress
txId = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 1.1)
signedFee = self.nodes[0].getrawmempool(True)[txId]['fee']
# compare fee
feeDelta = Decimal(fundedTx['fee']) - Decimal(signedFee)
assert(feeDelta >= 0 and feeDelta <= feeTolerance)
#
#
# compare fee of a standard pubkeyhash transaction with multiple
# outputs
inputs = []
outputs = {self.nodes[1].getnewaddress(): 1.1, self.nodes[1].getnewaddress(): 1.2, self.nodes[1].getnewaddress(): 0.1, self.nodes[
1].getnewaddress(): 1.3, self.nodes[1].getnewaddress(): 0.2, self.nodes[1].getnewaddress(): 0.3}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[0].fundrawtransaction(rawTx)
# create same transaction over sendtoaddress
txId = self.nodes[0].sendmany("", outputs)
signedFee = self.nodes[0].getrawmempool(True)[txId]['fee']
# compare fee
feeDelta = Decimal(fundedTx['fee']) - Decimal(signedFee)
assert(feeDelta >= 0 and feeDelta <= feeTolerance)
#
#
# compare fee of a 2of2 multisig p2sh transaction
# create 2of2 addr
addr1 = self.nodes[1].getnewaddress()
addr2 = self.nodes[1].getnewaddress()
addr1Obj = self.nodes[1].validateaddress(addr1)
addr2Obj = self.nodes[1].validateaddress(addr2)
mSigObj = self.nodes[1].addmultisigaddress(
2, [addr1Obj['pubkey'], addr2Obj['pubkey']])
inputs = []
outputs = {mSigObj: 1.1}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[0].fundrawtransaction(rawTx)
# create same transaction over sendtoaddress
txId = self.nodes[0].sendtoaddress(mSigObj, 1.1)
signedFee = self.nodes[0].getrawmempool(True)[txId]['fee']
# compare fee
feeDelta = Decimal(fundedTx['fee']) - Decimal(signedFee)
assert(feeDelta >= 0 and feeDelta <= feeTolerance)
#
#
# compare fee of a standard pubkeyhash transaction
# create 4of5 addr
addr1 = self.nodes[1].getnewaddress()
addr2 = self.nodes[1].getnewaddress()
addr3 = self.nodes[1].getnewaddress()
addr4 = self.nodes[1].getnewaddress()
addr5 = self.nodes[1].getnewaddress()
addr1Obj = self.nodes[1].validateaddress(addr1)
addr2Obj = self.nodes[1].validateaddress(addr2)
addr3Obj = self.nodes[1].validateaddress(addr3)
addr4Obj = self.nodes[1].validateaddress(addr4)
addr5Obj = self.nodes[1].validateaddress(addr5)
mSigObj = self.nodes[1].addmultisigaddress(
4, [addr1Obj['pubkey'], addr2Obj['pubkey'], addr3Obj['pubkey'], addr4Obj['pubkey'], addr5Obj['pubkey']])
inputs = []
outputs = {mSigObj: 1.1}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[0].fundrawtransaction(rawTx)
# create same transaction over sendtoaddress
txId = self.nodes[0].sendtoaddress(mSigObj, 1.1)
signedFee = self.nodes[0].getrawmempool(True)[txId]['fee']
# compare fee
feeDelta = Decimal(fundedTx['fee']) - Decimal(signedFee)
assert(feeDelta >= 0 and feeDelta <= feeTolerance)
#
#
# spend a 2of2 multisig transaction over fundraw
# create 2of2 addr
addr1 = self.nodes[2].getnewaddress()
addr2 = self.nodes[2].getnewaddress()
addr1Obj = self.nodes[2].validateaddress(addr1)
addr2Obj = self.nodes[2].validateaddress(addr2)
mSigObj = self.nodes[2].addmultisigaddress(
2, [addr1Obj['pubkey'], addr2Obj['pubkey']])
# send 1.2 BTC to msig addr
txId = self.nodes[0].sendtoaddress(mSigObj, 1.2)
self.sync_all()
self.nodes[1].generate(1)
self.sync_all()
oldBalance = self.nodes[1].getbalance()
inputs = []
outputs = {self.nodes[1].getnewaddress(): 1.1}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[2].fundrawtransaction(rawTx)
signedTx = self.nodes[2].signrawtransaction(fundedTx['hex'])
txId = self.nodes[2].sendrawtransaction(signedTx['hex'])
self.sync_all()
self.nodes[1].generate(1)
self.sync_all()
# make sure funds are received at node1
assert_equal(
oldBalance + Decimal('1.10000000'), self.nodes[1].getbalance())
#
# locked wallet test
self.stop_node(0)
self.nodes[1].node_encrypt_wallet("test")
self.stop_node(2)
self.stop_node(3)
self.start_nodes()
# This test is not meant to test fee estimation and we'd like
# to be sure all txs are sent at a consistent desired feerate
for node in self.nodes:
node.settxfee(min_relay_tx_fee)
connect_nodes_bi(self.nodes[0], self.nodes[1])
connect_nodes_bi(self.nodes[1], self.nodes[2])
connect_nodes_bi(self.nodes[0], self.nodes[2])
connect_nodes_bi(self.nodes[0], self.nodes[3])
self.sync_all()
# drain the keypool
self.nodes[1].getnewaddress()
self.nodes[1].getrawchangeaddress()
inputs = []
outputs = {self.nodes[0].getnewaddress(): 1.1}
rawTx = self.nodes[1].createrawtransaction(inputs, outputs)
# fund a transaction that requires a new key for the change output
# creating the key must be impossible because the wallet is locked
assert_raises_rpc_error(-4, "Keypool ran out, please call keypoolrefill first",
self.nodes[1].fundrawtransaction, rawTx)
# refill the keypool
self.nodes[1].walletpassphrase("test", 100)
# need to refill the keypool to get an internal change address
self.nodes[1].keypoolrefill(8)
self.nodes[1].walletlock()
assert_raises_rpc_error(-13, "walletpassphrase", self.nodes[
1].sendtoaddress, self.nodes[0].getnewaddress(), 1.2)
oldBalance = self.nodes[0].getbalance()
inputs = []
outputs = {self.nodes[0].getnewaddress(): 1.1}
rawTx = self.nodes[1].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[1].fundrawtransaction(rawTx)
# now we need to unlock
self.nodes[1].walletpassphrase("test", 600)
signedTx = self.nodes[1].signrawtransaction(fundedTx['hex'])
txId = self.nodes[1].sendrawtransaction(signedTx['hex'])
self.nodes[1].generate(1)
self.sync_all()
# make sure funds are received at node1
assert_equal(
oldBalance + Decimal('51.10000000'), self.nodes[0].getbalance())
#
# multiple (~19) inputs tx test | Compare fee #
#
# empty node1, send some small coins from node0 to node1
self.nodes[1].sendtoaddress(
self.nodes[0].getnewaddress(), self.nodes[1].getbalance(), "", "", True)
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
for i in range(0, 20):
self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.01)
self.nodes[0].generate(1)
self.sync_all()
# fund a tx with ~20 small inputs
inputs = []
outputs = {
self.nodes[0].getnewaddress(): 0.15, self.nodes[0].getnewaddress(): 0.04}
rawTx = self.nodes[1].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[1].fundrawtransaction(rawTx)
# create same transaction over sendtoaddress
txId = self.nodes[1].sendmany("", outputs)
signedFee = self.nodes[1].getrawmempool(True)[txId]['fee']
# compare fee
feeDelta = Decimal(fundedTx['fee']) - Decimal(signedFee)
assert(feeDelta >= 0 and feeDelta <= feeTolerance * 19) # ~19 inputs
#
# multiple (~19) inputs tx test | sign/send #
#
# again, empty node1, send some small coins from node0 to node1
self.nodes[1].sendtoaddress(
self.nodes[0].getnewaddress(), self.nodes[1].getbalance(), "", "", True)
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
for i in range(0, 20):
self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.01)
self.nodes[0].generate(1)
self.sync_all()
# fund a tx with ~20 small inputs
oldBalance = self.nodes[0].getbalance()
inputs = []
outputs = {
self.nodes[0].getnewaddress(): 0.15, self.nodes[0].getnewaddress(): 0.04}
rawTx = self.nodes[1].createrawtransaction(inputs, outputs)
fundedTx = self.nodes[1].fundrawtransaction(rawTx)
fundedAndSignedTx = self.nodes[1].signrawtransaction(fundedTx['hex'])
txId = self.nodes[1].sendrawtransaction(fundedAndSignedTx['hex'])
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
assert_equal(oldBalance + Decimal('50.19000000'),
self.nodes[0].getbalance()) # 0.19+block reward
#
# test fundrawtransaction with OP_RETURN and no vin #
#
rawTx = "0100000000010000000000000000066a047465737400000000"
dec_tx = self.nodes[2].decoderawtransaction(rawTx)
assert_equal(len(dec_tx['vin']), 0)
assert_equal(len(dec_tx['vout']), 1)
rawtxfund = self.nodes[2].fundrawtransaction(rawTx)
dec_tx = self.nodes[2].decoderawtransaction(rawtxfund['hex'])
assert_greater_than(len(dec_tx['vin']), 0) # at least one vin
assert_equal(len(dec_tx['vout']), 2) # one change output added
#
# test a fundrawtransaction using only watchonly #
#
inputs = []
outputs = {self.nodes[2].getnewaddress(): watchonly_amount / 2}
rawTx = self.nodes[3].createrawtransaction(inputs, outputs)
result = self.nodes[3].fundrawtransaction(
rawTx, {'includeWatching': True})
res_dec = self.nodes[0].decoderawtransaction(result["hex"])
assert_equal(len(res_dec["vin"]), 1)
assert_equal(res_dec["vin"][0]["txid"], watchonly_txid)
assert("fee" in result.keys())
assert_greater_than(result["changepos"], -1)
#
# test fundrawtransaction using the entirety of watched funds #
#
inputs = []
outputs = {self.nodes[2].getnewaddress(): watchonly_amount}
rawTx = self.nodes[3].createrawtransaction(inputs, outputs)
# Backward compatibility test (2nd param is includeWatching)
result = self.nodes[3].fundrawtransaction(rawTx, True)
res_dec = self.nodes[0].decoderawtransaction(result["hex"])
assert_equal(len(res_dec["vin"]), 2)
assert(res_dec["vin"][0]["txid"] == watchonly_txid or res_dec[
"vin"][1]["txid"] == watchonly_txid)
assert_greater_than(result["fee"], 0)
assert_greater_than(result["changepos"], -1)
assert_equal(result["fee"] + res_dec["vout"][
result["changepos"]]["value"], watchonly_amount / 10)
signedtx = self.nodes[3].signrawtransaction(result["hex"])
assert(not signedtx["complete"])
signedtx = self.nodes[0].signrawtransaction(signedtx["hex"])
assert(signedtx["complete"])
self.nodes[0].sendrawtransaction(signedtx["hex"])
self.nodes[0].generate(1)
self.sync_all()
#
# Test feeRate option #
#
# Make sure there is exactly one input so coin selection can't skew the
# result
assert_equal(len(self.nodes[3].listunspent(1)), 1)
inputs = []
outputs = {self.nodes[3].getnewaddress(): 1}
rawTx = self.nodes[3].createrawtransaction(inputs, outputs)
result = self.nodes[3].fundrawtransaction(
rawTx) # uses min_relay_tx_fee (set by settxfee)
result2 = self.nodes[3].fundrawtransaction(
rawTx, {"feeRate": 2 * min_relay_tx_fee})
result_fee_rate = result['fee'] * 1000 / \
FromHex(CTransaction(), result['hex']).billable_size()
assert_fee_amount(
result2['fee'], FromHex(CTransaction(), result2['hex']).billable_size(), 2 * result_fee_rate)
result3 = self.nodes[3].fundrawtransaction(
rawTx, {"feeRate": 10 * min_relay_tx_fee})
# allow this transaction to be underfunded by 10 bytes. This is due
# to the first transaction possibly being overfunded by up to .9
# satoshi due to fee ceilings being used.
assert_fee_amount(
result3['fee'], FromHex(CTransaction(), result3['hex']).billable_size(), 10 * result_fee_rate, 10)
#
# Test address reuse option #
#
result3 = self.nodes[3].fundrawtransaction(
rawTx, {"reserveChangeKey": False})
res_dec = self.nodes[0].decoderawtransaction(result3["hex"])
changeaddress = ""
for out in res_dec['vout']:
if out['value'] > 1.0:
changeaddress += out['scriptPubKey']['addresses'][0]
assert(changeaddress != "")
nextaddr = self.nodes[3].getrawchangeaddress()
# frt should not have removed the key from the keypool
assert(changeaddress == nextaddr)
result3 = self.nodes[3].fundrawtransaction(rawTx)
res_dec = self.nodes[0].decoderawtransaction(result3["hex"])
changeaddress = ""
for out in res_dec['vout']:
if out['value'] > 1.0:
changeaddress += out['scriptPubKey']['addresses'][0]
assert(changeaddress != "")
nextaddr = self.nodes[3].getnewaddress()
# Now the change address key should be removed from the keypool
assert(changeaddress != nextaddr)
#
# Test subtractFeeFromOutputs option #
#
# Make sure there is exactly one input so coin selection can't skew the
# result
assert_equal(len(self.nodes[3].listunspent(1)), 1)
inputs = []
outputs = {self.nodes[2].getnewaddress(): 1}
rawTx = self.nodes[3].createrawtransaction(inputs, outputs)
result = [self.nodes[3].fundrawtransaction(rawTx), # uses min_relay_tx_fee (set by settxfee)
self.nodes[3].fundrawtransaction(
rawTx, {"subtractFeeFromOutputs": []}), # empty subtraction list
self.nodes[3].fundrawtransaction(
rawTx, {"subtractFeeFromOutputs": [0]}), # uses min_relay_tx_fee (set by settxfee)
self.nodes[3].fundrawtransaction(
rawTx, {"feeRate": 2 * min_relay_tx_fee}),
self.nodes[3].fundrawtransaction(rawTx, {"feeRate": 2 * min_relay_tx_fee, "subtractFeeFromOutputs": [0]})]
dec_tx = [self.nodes[3].decoderawtransaction(tx['hex'])
for tx in result]
output = [d['vout'][1 - r['changepos']]['value']
for d, r in zip(dec_tx, result)]
change = [d['vout'][r['changepos']]['value']
for d, r in zip(dec_tx, result)]
assert_equal(result[0]['fee'], result[1]['fee'], result[2]['fee'])
assert_equal(result[3]['fee'], result[4]['fee'])
assert_equal(change[0], change[1])
assert_equal(output[0], output[1])
assert_equal(output[0], output[2] + result[2]['fee'])
assert_equal(change[0] + result[0]['fee'], change[2])
assert_equal(output[3], output[4] + result[4]['fee'])
assert_equal(change[3] + result[3]['fee'], change[4])
inputs = []
outputs = {
self.nodes[2].getnewaddress(): value for value in (1.0, 1.1, 1.2, 1.3)}
keys = list(outputs.keys())
rawTx = self.nodes[3].createrawtransaction(inputs, outputs)
result = [self.nodes[3].fundrawtransaction(rawTx),
# split the fee between outputs 0, 2, and 3, but not output 1
self.nodes[3].fundrawtransaction(rawTx, {"subtractFeeFromOutputs": [0, 2, 3]})]
dec_tx = [self.nodes[3].decoderawtransaction(result[0]['hex']),
self.nodes[3].decoderawtransaction(result[1]['hex'])]
# Nested list of non-change output amounts for each transaction
output = [[out['value'] for i, out in enumerate(d['vout']) if i != r['changepos']]
for d, r in zip(dec_tx, result)]
# List of differences in output amounts between normal and subtractFee
# transactions
share = [o0 - o1 for o0, o1 in zip(output[0], output[1])]
# output 1 is the same in both transactions
assert_equal(share[1], 0)
# the other 3 outputs are smaller as a result of subtractFeeFromOutputs
assert_greater_than(share[0], 0)
assert_greater_than(share[2], 0)
assert_greater_than(share[3], 0)
# outputs 2 and 3 take the same share of the fee
assert_equal(share[2], share[3])
# output 0 takes at least as much share of the fee, and no more than 2
# satoshis more, than outputs 2 and 3
assert_greater_than_or_equal(share[0], share[2])
assert_greater_than_or_equal(share[2] + Decimal(2e-8), share[0])
# the fee is the same in both transactions
assert_equal(result[0]['fee'], result[1]['fee'])
# the total subtracted from the outputs is equal to the fee
assert_equal(share[0] + share[2] + share[3], result[0]['fee'])
if __name__ == '__main__':
RawTransactionsTest().main()
diff --git a/test/functional/rpc_invalidateblock.py b/test/functional/rpc_invalidateblock.py
index b0cd3a1ef9..9183deb76b 100755
--- a/test/functional/rpc_invalidateblock.py
+++ b/test/functional/rpc_invalidateblock.py
@@ -1,74 +1,76 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Test InvalidateBlock code
#
+import time
+
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import *
+from test_framework.util import assert_equal, connect_nodes_bi, sync_blocks
class InvalidateTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 3
self.extra_args = [["-noparkdeepreorg"], [], []]
def setup_network(self):
self.setup_nodes()
def run_test(self):
self.log.info(
"Make sure we repopulate setBlockIndexCandidates after InvalidateBlock:")
self.log.info("Mine 4 blocks on Node 0")
self.nodes[0].generate(4)
assert(self.nodes[0].getblockcount() == 4)
besthash = self.nodes[0].getbestblockhash()
self.log.info("Mine competing 6 blocks on Node 1")
self.nodes[1].generate(6)
assert(self.nodes[1].getblockcount() == 6)
self.log.info("Connect nodes to force a reorg")
connect_nodes_bi(self.nodes[0], self.nodes[1])
sync_blocks(self.nodes[0:2])
assert(self.nodes[0].getblockcount() == 6)
badhash = self.nodes[1].getblockhash(2)
self.log.info(
"Invalidate block 2 on node 0 and verify we reorg to node 0's original chain")
self.nodes[0].invalidateblock(badhash)
newheight = self.nodes[0].getblockcount()
newhash = self.nodes[0].getbestblockhash()
if (newheight != 4 or newhash != besthash):
raise AssertionError(
"Wrong tip for node0, hash {}, height {}".format(newhash, newheight))
self.log.info("\nMake sure we won't reorg to a lower work chain:")
connect_nodes_bi(self.nodes[1], self.nodes[2])
self.log.info("Sync node 2 to node 1 so both have 6 blocks")
sync_blocks(self.nodes[1:3])
assert(self.nodes[2].getblockcount() == 6)
self.log.info("Invalidate block 5 on node 1 so its tip is now at 4")
self.nodes[1].invalidateblock(self.nodes[1].getblockhash(5))
assert(self.nodes[1].getblockcount() == 4)
self.log.info("Invalidate block 3 on node 2, so its tip is now 2")
self.nodes[2].invalidateblock(self.nodes[2].getblockhash(3))
assert(self.nodes[2].getblockcount() == 2)
self.log.info("..and then mine a block")
self.nodes[2].generate(1)
self.log.info("Verify all nodes are at the right height")
time.sleep(5)
assert_equal(self.nodes[2].getblockcount(), 3)
assert_equal(self.nodes[0].getblockcount(), 4)
node1height = self.nodes[1].getblockcount()
if node1height < 4:
raise AssertionError(
"Node 1 reorged to a lower height: {}".format(node1height))
if __name__ == '__main__':
InvalidateTest().main()
diff --git a/test/functional/rpc_listtransactions.py b/test/functional/rpc_listtransactions.py
index 6b565cb85a..779ac9f3f0 100755
--- a/test/functional/rpc_listtransactions.py
+++ b/test/functional/rpc_listtransactions.py
@@ -1,94 +1,96 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
# Exercise the listtransactions API
+from decimal import Decimal
+
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import *
+from test_framework.util import assert_array_result
class ListTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.enable_mocktime()
def run_test(self):
# Simple send, 0 to 1:
txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)
self.sync_all()
assert_array_result(self.nodes[0].listtransactions(),
{"txid": txid},
{"category": "send", "account": "", "amount": Decimal("-0.1"), "confirmations": 0})
assert_array_result(self.nodes[1].listtransactions(),
{"txid": txid},
{"category": "receive", "account": "", "amount": Decimal("0.1"), "confirmations": 0})
# mine a block, confirmations should change:
self.nodes[0].generate(1)
self.sync_all()
assert_array_result(self.nodes[0].listtransactions(),
{"txid": txid},
{"category": "send", "account": "", "amount": Decimal("-0.1"), "confirmations": 1})
assert_array_result(self.nodes[1].listtransactions(),
{"txid": txid},
{"category": "receive", "account": "", "amount": Decimal("0.1"), "confirmations": 1})
# send-to-self:
txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.2)
assert_array_result(self.nodes[0].listtransactions(),
{"txid": txid, "category": "send"},
{"amount": Decimal("-0.2")})
assert_array_result(self.nodes[0].listtransactions(),
{"txid": txid, "category": "receive"},
{"amount": Decimal("0.2")})
# sendmany from node1: twice to self, twice to node2:
send_to = {self.nodes[0].getnewaddress(): 0.11,
self.nodes[1].getnewaddress(): 0.22,
self.nodes[0].getaccountaddress("from1"): 0.33,
self.nodes[1].getaccountaddress("toself"): 0.44}
txid = self.nodes[1].sendmany("", send_to)
self.sync_all()
assert_array_result(self.nodes[1].listtransactions(),
{"category": "send", "amount": Decimal("-0.11")},
{"txid": txid})
assert_array_result(self.nodes[0].listtransactions(),
{"category": "receive", "amount": Decimal("0.11")},
{"txid": txid})
assert_array_result(self.nodes[1].listtransactions(),
{"category": "send", "amount": Decimal("-0.22")},
{"txid": txid})
assert_array_result(self.nodes[1].listtransactions(),
{"category": "receive", "amount": Decimal("0.22")},
{"txid": txid})
assert_array_result(self.nodes[1].listtransactions(),
{"category": "send", "amount": Decimal("-0.33")},
{"txid": txid})
assert_array_result(self.nodes[0].listtransactions(),
{"category": "receive", "amount": Decimal("0.33")},
{"txid": txid, "account": "from1"})
assert_array_result(self.nodes[1].listtransactions(),
{"category": "send", "amount": Decimal("-0.44")},
{"txid": txid, "account": ""})
assert_array_result(self.nodes[1].listtransactions(),
{"category": "receive", "amount": Decimal("0.44")},
{"txid": txid, "account": "toself"})
multisig = self.nodes[1].createmultisig(
1, [self.nodes[1].getnewaddress()])
self.nodes[0].importaddress(
multisig["redeemScript"], "watchonly", False, True)
txid = self.nodes[1].sendtoaddress(multisig["address"], 0.1)
self.nodes[1].generate(1)
self.sync_all()
assert(
len(self.nodes[0].listtransactions("watchonly", 100, 0, False)) == 0)
assert_array_result(
self.nodes[0].listtransactions("watchonly", 100, 0, True),
{"category": "receive", "amount": Decimal("0.1")},
{"txid": txid, "account": "watchonly"})
if __name__ == '__main__':
ListTransactionsTest().main()
diff --git a/test/functional/rpc_rawtransaction.py b/test/functional/rpc_rawtransaction.py
index 80572e6110..d2b6169a27 100755
--- a/test/functional/rpc_rawtransaction.py
+++ b/test/functional/rpc_rawtransaction.py
@@ -1,295 +1,301 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""rawtranscation RPCs QA test.
# Tests the following RPCs:
# - createrawtransaction
# - signrawtransaction
# - sendrawtransaction
# - decoderawtransaction
# - getrawtransaction
"""
+from decimal import Decimal
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import *
from test_framework.txtools import pad_raw_tx
+from test_framework.util import (
+ assert_equal,
+ assert_greater_than,
+ assert_raises_rpc_error,
+ connect_nodes_bi,
+)
# Create one-input, one-output, no-fee transaction:
class RawTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 3
def setup_network(self, split=False):
super().setup_network()
connect_nodes_bi(self.nodes[0], self.nodes[2])
def run_test(self):
# prepare some coins for multiple *rawtransaction commands
self.nodes[2].generate(1)
self.sync_all()
self.nodes[0].generate(101)
self.sync_all()
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 1.5)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 1.0)
self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 5.0)
self.sync_all()
self.nodes[0].generate(5)
self.sync_all()
#
# sendrawtransaction with missing input #
#
inputs = [
{'txid': "1d1d4e24ed99057e84c3f80fd8fbec79ed9e1acee37da269356ecea000000000", 'vout': 1}]
# won't exists
outputs = {self.nodes[0].getnewaddress(): 4.998}
rawtx = self.nodes[2].createrawtransaction(inputs, outputs)
rawtx = pad_raw_tx(rawtx)
rawtx = self.nodes[2].signrawtransaction(rawtx)
# This will raise an exception since there are missing inputs
assert_raises_rpc_error(
-25, "Missing inputs", self.nodes[2].sendrawtransaction, rawtx['hex'])
#
# RAW TX MULTISIG TESTS #
#
# 2of2 test
addr1 = self.nodes[2].getnewaddress()
addr2 = self.nodes[2].getnewaddress()
addr1Obj = self.nodes[2].validateaddress(addr1)
addr2Obj = self.nodes[2].validateaddress(addr2)
mSigObj = self.nodes[2].addmultisigaddress(
2, [addr1Obj['pubkey'], addr2Obj['pubkey']])
# use balance deltas instead of absolute values
bal = self.nodes[2].getbalance()
# send 1.2 BTC to msig adr
txId = self.nodes[0].sendtoaddress(mSigObj, 1.2)
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
# node2 has both keys of the 2of2 ms addr., tx should affect the
# balance
assert_equal(self.nodes[2].getbalance(), bal + Decimal('1.20000000'))
# 2of3 test from different nodes
bal = self.nodes[2].getbalance()
addr1 = self.nodes[1].getnewaddress()
addr2 = self.nodes[2].getnewaddress()
addr3 = self.nodes[2].getnewaddress()
addr1Obj = self.nodes[1].validateaddress(addr1)
addr2Obj = self.nodes[2].validateaddress(addr2)
addr3Obj = self.nodes[2].validateaddress(addr3)
mSigObj = self.nodes[2].addmultisigaddress(
2, [addr1Obj['pubkey'], addr2Obj['pubkey'], addr3Obj['pubkey']])
txId = self.nodes[0].sendtoaddress(mSigObj, 2.2)
decTx = self.nodes[0].gettransaction(txId)
rawTx = self.nodes[0].decoderawtransaction(decTx['hex'])
sPK = rawTx['vout'][0]['scriptPubKey']['hex']
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
# THIS IS A INCOMPLETE FEATURE
# NODE2 HAS TWO OF THREE KEY AND THE FUNDS SHOULD BE SPENDABLE AND
# COUNT AT BALANCE CALCULATION
# for now, assume the funds of a 2of3 multisig tx are not marked as
# spendable
assert_equal(self.nodes[2].getbalance(), bal)
txDetails = self.nodes[0].gettransaction(txId, True)
rawTx = self.nodes[0].decoderawtransaction(txDetails['hex'])
vout = False
for outpoint in rawTx['vout']:
if outpoint['value'] == Decimal('2.20000000'):
vout = outpoint
break
bal = self.nodes[0].getbalance()
inputs = [{
"txid": txId,
"vout": vout['n'],
"scriptPubKey": vout['scriptPubKey']['hex'],
"amount": vout['value'],
}]
outputs = {self.nodes[0].getnewaddress(): 2.19}
rawTx = self.nodes[2].createrawtransaction(inputs, outputs)
rawTxPartialSigned = self.nodes[1].signrawtransaction(rawTx, inputs)
# node1 only has one key, can't comp. sign the tx
assert_equal(rawTxPartialSigned['complete'], False)
rawTxSigned = self.nodes[2].signrawtransaction(rawTx, inputs)
# node2 can sign the tx compl., own two of three keys
assert_equal(rawTxSigned['complete'], True)
self.nodes[2].sendrawtransaction(rawTxSigned['hex'])
rawTx = self.nodes[0].decoderawtransaction(rawTxSigned['hex'])
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
assert_equal(self.nodes[0].getbalance(), bal + Decimal(
'50.00000000') + Decimal('2.19000000')) # block reward + tx
rawTxBlock = self.nodes[0].getblock(self.nodes[0].getbestblockhash())
# 2of2 test for combining transactions
bal = self.nodes[2].getbalance()
addr1 = self.nodes[1].getnewaddress()
addr2 = self.nodes[2].getnewaddress()
addr1Obj = self.nodes[1].validateaddress(addr1)
addr2Obj = self.nodes[2].validateaddress(addr2)
self.nodes[1].addmultisigaddress(
2, [addr1Obj['pubkey'], addr2Obj['pubkey']])
mSigObj = self.nodes[2].addmultisigaddress(
2, [addr1Obj['pubkey'], addr2Obj['pubkey']])
mSigObjValid = self.nodes[2].validateaddress(mSigObj)
txId = self.nodes[0].sendtoaddress(mSigObj, 2.2)
decTx = self.nodes[0].gettransaction(txId)
rawTx2 = self.nodes[0].decoderawtransaction(decTx['hex'])
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
# the funds of a 2of2 multisig tx should not be marked as spendable
assert_equal(self.nodes[2].getbalance(), bal)
txDetails = self.nodes[0].gettransaction(txId, True)
rawTx2 = self.nodes[0].decoderawtransaction(txDetails['hex'])
vout = False
for outpoint in rawTx2['vout']:
if outpoint['value'] == Decimal('2.20000000'):
vout = outpoint
break
bal = self.nodes[0].getbalance()
inputs = [{"txid": txId, "vout": vout['n'], "scriptPubKey": vout['scriptPubKey']
['hex'], "redeemScript": mSigObjValid['hex'], "amount": vout['value']}]
outputs = {self.nodes[0].getnewaddress(): 2.19}
rawTx2 = self.nodes[2].createrawtransaction(inputs, outputs)
rawTxPartialSigned1 = self.nodes[1].signrawtransaction(rawTx2, inputs)
self.log.info(rawTxPartialSigned1)
# node1 only has one key, can't comp. sign the tx
assert_equal(rawTxPartialSigned['complete'], False)
rawTxPartialSigned2 = self.nodes[2].signrawtransaction(rawTx2, inputs)
self.log.info(rawTxPartialSigned2)
# node2 only has one key, can't comp. sign the tx
assert_equal(rawTxPartialSigned2['complete'], False)
rawTxComb = self.nodes[2].combinerawtransaction(
[rawTxPartialSigned1['hex'], rawTxPartialSigned2['hex']])
self.log.info(rawTxComb)
self.nodes[2].sendrawtransaction(rawTxComb)
rawTx2 = self.nodes[0].decoderawtransaction(rawTxComb)
self.sync_all()
self.nodes[0].generate(1)
self.sync_all()
assert_equal(self.nodes[0].getbalance(
), bal+Decimal('50.00000000')+Decimal('2.19000000')) # block reward + tx
# getrawtransaction tests
# 1. valid parameters - only supply txid
txHash = rawTx["hash"]
assert_equal(
self.nodes[0].getrawtransaction(txHash), rawTxSigned['hex'])
# 2. valid parameters - supply txid and 0 for non-verbose
assert_equal(
self.nodes[0].getrawtransaction(txHash, 0), rawTxSigned['hex'])
# 3. valid parameters - supply txid and False for non-verbose
assert_equal(self.nodes[0].getrawtransaction(
txHash, False), rawTxSigned['hex'])
# 4. valid parameters - supply txid and 1 for verbose.
# We only check the "hex" field of the output so we don't need to
# update this test every time the output format changes.
assert_equal(self.nodes[0].getrawtransaction(
txHash, 1)["hex"], rawTxSigned['hex'])
# 5. valid parameters - supply txid and True for non-verbose
assert_equal(self.nodes[0].getrawtransaction(
txHash, True)["hex"], rawTxSigned['hex'])
# 6. invalid parameters - supply txid and string "Flase"
assert_raises_rpc_error(
-3, "Invalid type", self.nodes[0].getrawtransaction, txHash, "False")
# 7. invalid parameters - supply txid and empty array
assert_raises_rpc_error(
-3, "Invalid type", self.nodes[0].getrawtransaction, txHash, [])
# 8. invalid parameters - supply txid and empty dict
assert_raises_rpc_error(
-3, "Invalid type", self.nodes[0].getrawtransaction, txHash, {})
# Sanity checks on verbose getrawtransaction output
rawTxOutput = self.nodes[0].getrawtransaction(txHash, True)
assert_equal(rawTxOutput["hex"], rawTxSigned["hex"])
assert_equal(rawTxOutput["txid"], txHash)
assert_equal(rawTxOutput["hash"], txHash)
assert_greater_than(rawTxOutput["size"], 300)
assert_equal(rawTxOutput["version"], 0x02)
assert_equal(rawTxOutput["locktime"], 0)
assert_equal(len(rawTxOutput["vin"]), 1)
assert_equal(len(rawTxOutput["vout"]), 1)
assert_equal(rawTxOutput["blockhash"], rawTxBlock["hash"])
assert_equal(rawTxOutput["confirmations"], 3)
assert_equal(rawTxOutput["time"], rawTxBlock["time"])
assert_equal(rawTxOutput["blocktime"], rawTxBlock["time"])
inputs = [
{'txid': "1d1d4e24ed99057e84c3f80fd8fbec79ed9e1acee37da269356ecea000000000", 'sequence': 1000}]
outputs = {self.nodes[0].getnewaddress(): 1}
assert_raises_rpc_error(
-8, 'Invalid parameter, missing vout key',
self.nodes[0].createrawtransaction, inputs, outputs)
inputs[0]['vout'] = "1"
assert_raises_rpc_error(
-8, 'Invalid parameter, vout must be a number',
self.nodes[0].createrawtransaction, inputs, outputs)
inputs[0]['vout'] = -1
assert_raises_rpc_error(
-8, 'Invalid parameter, vout must be positive',
self.nodes[0].createrawtransaction, inputs, outputs)
inputs[0]['vout'] = 1
rawtx = self.nodes[0].createrawtransaction(inputs, outputs)
decrawtx = self.nodes[0].decoderawtransaction(rawtx)
assert_equal(decrawtx['vin'][0]['sequence'], 1000)
# 9. invalid parameters - sequence number out of range
inputs[0]['sequence'] = -1
assert_raises_rpc_error(
-8, 'Invalid parameter, sequence number is out of range',
self.nodes[0].createrawtransaction, inputs, outputs)
# 10. invalid parameters - sequence number out of range
inputs[0]['sequence'] = 4294967296
assert_raises_rpc_error(
-8, 'Invalid parameter, sequence number is out of range',
self.nodes[0].createrawtransaction, inputs, outputs)
inputs[0]['sequence'] = 4294967294
rawtx = self.nodes[0].createrawtransaction(inputs, outputs)
decrawtx = self.nodes[0].decoderawtransaction(rawtx)
assert_equal(decrawtx['vin'][0]['sequence'], 4294967294)
if __name__ == '__main__':
RawTransactionsTest().main()
diff --git a/test/functional/rpc_signrawtransaction.py b/test/functional/rpc_signrawtransaction.py
index 61cde13324..16e232f814 100755
--- a/test/functional/rpc_signrawtransaction.py
+++ b/test/functional/rpc_signrawtransaction.py
@@ -1,192 +1,192 @@
#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import *
+from test_framework.util import assert_equal, assert_raises_rpc_error
class SignRawTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
def successful_signing_test(self):
"""Creates and signs a valid raw transaction with one input.
Expected results:
1) The transaction has a complete set of signatures
2) No script verification error occurred"""
privKeys = ['cUeKHd5orzT3mz8P9pxyREHfsWtVfgsfDjiZZBcjUBAaGk1BTj7N',
'cVKpPfVKSJxKqVpE9awvXNWuLHCa5j5tiE7K6zbUSptFpTEtiFrA']
inputs = [
# Valid pay-to-pubkey scripts
{'txid': '9b907ef1e3c26fc71fe4a4b3580bc75264112f95050014157059c736f0202e71',
'vout': 0, 'amount': 3.14159,
'scriptPubKey': '76a91460baa0f494b38ce3c940dea67f3804dc52d1fb9488ac'},
{'txid': '83a4f6a6b73660e13ee6cb3c6063fa3759c50c9b7521d0536022961898f4fb02',
'vout': 0, 'amount': '123.456',
'scriptPubKey': '76a914669b857c03a5ed269d5d85a1ffac9ed5d663072788ac'},
]
outputs = {'mpLQjfK79b7CCV4VMJWEWAj5Mpx8Up5zxB': 0.1}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
rawTxSigned = self.nodes[0].signrawtransaction(rawTx, inputs, privKeys)
# 1) The transaction has a complete set of signatures
assert 'complete' in rawTxSigned
assert_equal(rawTxSigned['complete'], True)
# 2) No script verification error occurred
assert 'errors' not in rawTxSigned
def script_verification_error_test(self):
"""Creates and signs a raw transaction with valid (vin 0), invalid (vin 1) and one missing (vin 2) input script.
Expected results:
3) The transaction has no complete set of signatures
4) Two script verification errors occurred
5) Script verification errors have certain properties ("txid", "vout", "scriptSig", "sequence", "error")
6) The verification errors refer to the invalid (vin 1) and missing input (vin 2)"""
privKeys = ['cUeKHd5orzT3mz8P9pxyREHfsWtVfgsfDjiZZBcjUBAaGk1BTj7N']
inputs = [
# Valid pay-to-pubkey script
{'txid': '9b907ef1e3c26fc71fe4a4b3580bc75264112f95050014157059c736f0202e71',
'vout': 0, 'amount': 0},
# Invalid script
{'txid': '5b8673686910442c644b1f4993d8f7753c7c8fcb5c87ee40d56eaeef25204547',
'vout': 7, 'amount': '1.1'},
# Missing scriptPubKey
{'txid': '9b907ef1e3c26fc71fe4a4b3580bc75264112f95050014157059c736f0202e71',
'vout': 1, 'amount': 2.0},
]
scripts = [
# Valid pay-to-pubkey script
{'txid': '9b907ef1e3c26fc71fe4a4b3580bc75264112f95050014157059c736f0202e71',
'vout': 0, 'amount': 0,
'scriptPubKey': '76a91460baa0f494b38ce3c940dea67f3804dc52d1fb9488ac'},
# Invalid script
{'txid': '5b8673686910442c644b1f4993d8f7753c7c8fcb5c87ee40d56eaeef25204547',
'vout': 7, 'amount': '1.1',
'scriptPubKey': 'badbadbadbad'}
]
outputs = {'mpLQjfK79b7CCV4VMJWEWAj5Mpx8Up5zxB': 0.1}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
# Make sure decoderawtransaction is at least marginally sane
decodedRawTx = self.nodes[0].decoderawtransaction(rawTx)
for i, inp in enumerate(inputs):
assert_equal(decodedRawTx["vin"][i]["txid"], inp["txid"])
assert_equal(decodedRawTx["vin"][i]["vout"], inp["vout"])
# Make sure decoderawtransaction throws if there is extra data
assert_raises_rpc_error(-22, "TX decode failed",
self.nodes[0].decoderawtransaction, rawTx + "00")
rawTxSigned = self.nodes[0].signrawtransaction(
rawTx, scripts, privKeys)
# 3) The transaction has no complete set of signatures
assert 'complete' in rawTxSigned
assert_equal(rawTxSigned['complete'], False)
# 4) Two script verification errors occurred
assert 'errors' in rawTxSigned
assert_equal(len(rawTxSigned['errors']), 2)
# 5) Script verification errors have certain properties
assert 'txid' in rawTxSigned['errors'][0]
assert 'vout' in rawTxSigned['errors'][0]
assert 'scriptSig' in rawTxSigned['errors'][0]
assert 'sequence' in rawTxSigned['errors'][0]
assert 'error' in rawTxSigned['errors'][0]
# 6) The verification errors refer to the invalid (vin 1) and missing
# input (vin 2)
assert_equal(rawTxSigned['errors'][0]['txid'], inputs[1]['txid'])
assert_equal(rawTxSigned['errors'][0]['vout'], inputs[1]['vout'])
assert_equal(rawTxSigned['errors'][1]['txid'], inputs[2]['txid'])
assert_equal(rawTxSigned['errors'][1]['vout'], inputs[2]['vout'])
def test_sighashes(self):
"""Creates and signs a raw transaction with various sighashes.
Expected result:
1) The transaction is complete if the sighash is valid and has FORKID.
2) The RPC throws an error if the sighash does not contain FORKID.
3) The RPC throws an error if the sighash is invalid."""
privKeys = ['cUeKHd5orzT3mz8P9pxyREHfsWtVfgsfDjiZZBcjUBAaGk1BTj7N']
inputs = [
# Valid pay-to-pubkey script
{'txid': '9b907ef1e3c26fc71fe4a4b3580bc75264112f95050014157059c736f0202e71',
'vout': 0, 'amount': 3.14159,
'scriptPubKey': '76a91460baa0f494b38ce3c940dea67f3804dc52d1fb9488ac'}
]
outputs = {'mpLQjfK79b7CCV4VMJWEWAj5Mpx8Up5zxB': 0.1}
rawTx = self.nodes[0].createrawtransaction(inputs, outputs)
valid_sighashes = [
"ALL|FORKID",
"NONE|FORKID",
"SINGLE|FORKID",
"ALL|FORKID|ANYONECANPAY",
"NONE|FORKID|ANYONECANPAY",
"SINGLE|FORKID|ANYONECANPAY"
]
no_forkid_sighashes = [
"ALL",
"NONE",
"SINGLE",
"ALL|ANYONECANPAY",
"NONE|ANYONECANPAY",
"SINGLE|ANYONECANPAY"
]
invalid_sighashes = [
"",
"ALL|SINGLE|FORKID",
str(0),
str(0x20)
]
# 1) If the sighash is valid with FORKID, the signature is complete
for sighash in valid_sighashes:
rawTxSigned = self.nodes[0].signrawtransaction(rawTx, inputs,
privKeys, sighash)
assert 'complete' in rawTxSigned
assert_equal(rawTxSigned['complete'], True)
assert 'errors' not in rawTxSigned
# 2) If FORKID is missing in the sighash, the RPC throws an error
for sighash in no_forkid_sighashes:
assert_raises_rpc_error(-8, "Signature must use SIGHASH_FORKID",
self.nodes[0].signrawtransaction,
rawTx, inputs, privKeys, sighash)
# 3) If the sighash is invalid the RPC throws an error
for sighash in invalid_sighashes:
assert_raises_rpc_error(-8, "Invalid sighash param",
self.nodes[0].signrawtransaction,
rawTx, inputs, privKeys, sighash)
def run_test(self):
self.successful_signing_test()
self.script_verification_error_test()
self.test_sighashes()
if __name__ == '__main__':
SignRawTransactionsTest().main()
diff --git a/test/functional/rpc_txoutproof.py b/test/functional/rpc_txoutproof.py
index d1abd1e0b5..af30e21711 100755
--- a/test/functional/rpc_txoutproof.py
+++ b/test/functional/rpc_txoutproof.py
@@ -1,105 +1,109 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Test merkleblock fetch/validation
#
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import *
+from test_framework.util import (
+ assert_equal,
+ assert_raises_rpc_error,
+ connect_nodes,
+)
class MerkleBlockTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 4
self.setup_clean_chain = True
# Nodes 0/1 are "wallet" nodes, Nodes 2/3 are used for testing
self.extra_args = [[], [], [], ["-txindex"]]
def setup_network(self):
self.setup_nodes()
connect_nodes(self.nodes[0], self.nodes[1])
connect_nodes(self.nodes[0], self.nodes[2])
connect_nodes(self.nodes[0], self.nodes[3])
self.sync_all()
def run_test(self):
self.log.info("Mining blocks...")
self.nodes[0].generate(105)
self.sync_all()
chain_height = self.nodes[1].getblockcount()
assert_equal(chain_height, 105)
assert_equal(self.nodes[1].getbalance(), 0)
assert_equal(self.nodes[2].getbalance(), 0)
node0utxos = self.nodes[0].listunspent(1)
tx1 = self.nodes[0].createrawtransaction(
[node0utxos.pop()], {self.nodes[1].getnewaddress(): 49.99})
txid1 = self.nodes[0].sendrawtransaction(
self.nodes[0].signrawtransaction(tx1)["hex"])
tx2 = self.nodes[0].createrawtransaction(
[node0utxos.pop()], {self.nodes[1].getnewaddress(): 49.99})
txid2 = self.nodes[0].sendrawtransaction(
self.nodes[0].signrawtransaction(tx2)["hex"])
# This will raise an exception because the transaction is not yet in a block
assert_raises_rpc_error(-5, "Transaction not yet in block",
self.nodes[0].gettxoutproof, [txid1])
self.nodes[0].generate(1)
blockhash = self.nodes[0].getblockhash(chain_height + 1)
self.sync_all()
txlist = []
blocktxn = self.nodes[0].getblock(blockhash, True)["tx"]
txlist.append(blocktxn[1])
txlist.append(blocktxn[2])
assert_equal(self.nodes[2].verifytxoutproof(
self.nodes[2].gettxoutproof([txid1])), [txid1])
assert_equal(self.nodes[2].verifytxoutproof(
self.nodes[2].gettxoutproof([txid1, txid2])), txlist)
assert_equal(self.nodes[2].verifytxoutproof(
self.nodes[2].gettxoutproof([txid1, txid2], blockhash)), txlist)
txin_spent = self.nodes[1].listunspent(1).pop()
tx3 = self.nodes[1].createrawtransaction(
[txin_spent], {self.nodes[0].getnewaddress(): 49.98})
txid3 = self.nodes[0].sendrawtransaction(
self.nodes[1].signrawtransaction(tx3)["hex"])
self.nodes[0].generate(1)
self.sync_all()
txid_spent = txin_spent["txid"]
txid_unspent = txid1 if txin_spent["txid"] != txid1 else txid2
# We can't find the block from a fully-spent tx
assert_raises_rpc_error(-5, "Transaction not yet in block",
self.nodes[2].gettxoutproof, [txid_spent])
# We can get the proof if we specify the block
assert_equal(self.nodes[2].verifytxoutproof(
self.nodes[2].gettxoutproof([txid_spent], blockhash)), [txid_spent])
# We can't get the proof if we specify a non-existent block
assert_raises_rpc_error(-5, "Block not found", self.nodes[2].gettxoutproof, [
txid_spent], "00000000000000000000000000000000")
# We can get the proof if the transaction is unspent
assert_equal(self.nodes[2].verifytxoutproof(
self.nodes[2].gettxoutproof([txid_unspent])), [txid_unspent])
# We can get the proof if we provide a list of transactions and one of them is unspent. The ordering of the list should not matter.
assert_equal(sorted(self.nodes[2].verifytxoutproof(
self.nodes[2].gettxoutproof([txid1, txid2]))), sorted(txlist))
assert_equal(sorted(self.nodes[2].verifytxoutproof(
self.nodes[2].gettxoutproof([txid2, txid1]))), sorted(txlist))
# We can always get a proof if we have a -txindex
assert_equal(self.nodes[2].verifytxoutproof(
self.nodes[3].gettxoutproof([txid_spent])), [txid_spent])
# We can't get a proof if we specify transactions from different blocks
assert_raises_rpc_error(-5, "Not all transactions found in specified or retrieved block",
self.nodes[2].gettxoutproof, [txid1, txid3])
if __name__ == '__main__':
MerkleBlockTest().main()
diff --git a/test/functional/rpc_users.py b/test/functional/rpc_users.py
index 8ff382c217..ba57fde2bd 100755
--- a/test/functional/rpc_users.py
+++ b/test/functional/rpc_users.py
@@ -1,159 +1,159 @@
#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
#
# Test multiple rpc user config option rpcauth
#
-from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import str_to_b64str, assert_equal
-
-import os
import http.client
+import os
import urllib.parse
+from test_framework.test_framework import BitcoinTestFramework
+from test_framework.util import assert_equal, str_to_b64str
+
class HTTPBasicsTest (BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
def setup_chain(self):
super().setup_chain()
# Append rpcauth to bitcoin.conf before initialization
rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144"
rpcauth2 = "rpcauth=rt2:f8607b1a88861fac29dfccf9b52ff9f$ff36a0c23c8c62b4846112e50fa888416e94c17bfd4c42f88fd8f55ec6a3137e"
rpcuser = "rpcuser=rpcuser💻"
rpcpassword = "rpcpassword=rpcpassword🔑"
with open(os.path.join(self.options.tmpdir + "/node0", "bitcoin.conf"), 'a', encoding='utf8') as f:
f.write(rpcauth + "\n")
f.write(rpcauth2 + "\n")
with open(os.path.join(self.options.tmpdir + "/node1", "bitcoin.conf"), 'a', encoding='utf8') as f:
f.write(rpcuser + "\n")
f.write(rpcpassword + "\n")
def run_test(self):
#
# Check correctness of the rpcauth config option #
#
url = urllib.parse.urlparse(self.nodes[0].url)
# Old authpair
authpair = url.username + ':' + url.password
# New authpair generated via share/rpcuser tool
rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144"
password = "cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM="
# Second authpair with different username
rpcauth2 = "rpcauth=rt2:f8607b1a88861fac29dfccf9b52ff9f$ff36a0c23c8c62b4846112e50fa888416e94c17bfd4c42f88fd8f55ec6a3137e"
password2 = "8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI="
authpairnew = "rt:" + password
headers = {"Authorization": "Basic " + str_to_b64str(authpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
# Use new authpair to confirm both work
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
# Wrong login name with rt's password
authpairnew = "rtwrong:" + password
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
# Wrong password for rt
authpairnew = "rt:" + password + "wrong"
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
# Correct for rt2
authpairnew = "rt2:" + password2
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
# Wrong password for rt2
authpairnew = "rt2:" + password2 + "wrong"
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
###############################################################
# Check correctness of the rpcuser/rpcpassword config options #
###############################################################
url = urllib.parse.urlparse(self.nodes[1].url)
# rpcuser and rpcpassword authpair
rpcuserauthpair = "rpcuser💻:rpcpassword🔑"
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 200)
conn.close()
# Wrong login name with rpcuser's password
rpcuserauthpair = "rpcuserwrong:rpcpassword"
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
# Wrong password for rpcuser
rpcuserauthpair = "rpcuser:rpcpasswordwrong"
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)}
conn = http.client.HTTPConnection(url.hostname, url.port)
conn.connect()
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers)
resp = conn.getresponse()
assert_equal(resp.status, 401)
conn.close()
if __name__ == '__main__':
HTTPBasicsTest().main()

File Metadata

Mime Type
text/x-diff
Expires
Wed, May 21, 19:55 (1 d, 8 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5865824
Default Alt Text
(102 KB)

Event Timeline