Page MenuHomePhabricator

No OneTemporary

diff --git a/test/functional/feature_assumevalid.py b/test/functional/feature_assumevalid.py
index d8ec6747d..12374deed 100644
--- a/test/functional/feature_assumevalid.py
+++ b/test/functional/feature_assumevalid.py
@@ -1,185 +1,184 @@
# Copyright (c) 2014-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test logic for skipping signature validation on old blocks.
Test logic for skipping signature validation on blocks which we've assumed
valid (https://github.com/bitcoin/bitcoin/pull/9484)
We build a chain that includes and invalid signature for one of the
transactions:
0: genesis block
1: block 1 with coinbase transaction output.
2-101: bury that block with 100 blocks so the coinbase transaction
output can be spent
102: a block containing a transaction spending the coinbase
transaction output. The transaction has an invalid signature.
103-2202: bury the bad block with just over two weeks' worth of blocks
(2100 blocks)
Start three nodes:
- node0 has no -assumevalid parameter. Try to sync to block 2202. It will
reject block 102 and only sync as far as block 101
- node1 has -assumevalid set to the hash of block 102. Try to sync to
block 2202. node1 will sync all the way to block 2202.
- node2 has -assumevalid set to the hash of block 102. Try to sync to
block 200. node2 will reject block 102 since it's assumed valid, but it
isn't buried by at least two weeks' work.
"""
from test_framework.blocktools import COINBASE_MATURITY, create_block, create_coinbase
from test_framework.key import ECKey
from test_framework.messages import (
CBlockHeader,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
msg_block,
msg_headers,
)
from test_framework.p2p import P2PInterface
from test_framework.script import OP_TRUE, CScript
from test_framework.test_framework import BitcoinTestFramework
from test_framework.txtools import pad_tx
from test_framework.util import assert_equal
class BaseNode(P2PInterface):
def send_header_for_blocks(self, new_blocks):
headers_message = msg_headers()
headers_message.headers = [CBlockHeader(b) for b in new_blocks]
self.send_message(headers_message)
class AssumeValidTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 3
# Need a bit of extra time when running with the thread sanitizer
self.rpc_timeout = 120
def setup_network(self):
self.add_nodes(3)
# Start node0. We don't start the other nodes yet since
# we need to pre-mine a block with an invalid transaction
# signature so we can pass in the block hash as assumevalid.
self.start_node(0)
def send_blocks_until_disconnected(self, p2p_conn):
"""Keep sending blocks to the node until we're disconnected."""
for i in range(len(self.blocks)):
if not p2p_conn.is_connected:
break
try:
p2p_conn.send_message(msg_block(self.blocks[i]))
except IOError:
assert not p2p_conn.is_connected
break
def run_test(self):
# Build the blockchain
self.tip = int(self.nodes[0].getbestblockhash(), 16)
self.block_time = (
self.nodes[0].getblock(self.nodes[0].getbestblockhash())["time"] + 1
)
self.blocks = []
# Get a pubkey for the coinbase TXO
coinbase_key = ECKey()
coinbase_key.generate()
coinbase_pubkey = coinbase_key.get_pubkey().get_bytes()
# Create the first block with a coinbase output to our key
height = 1
block = create_block(
self.tip, create_coinbase(height, coinbase_pubkey), self.block_time
)
self.blocks.append(block)
self.block_time += 1
block.solve()
# Save the coinbase for later
self.block1 = block
self.tip = block.sha256
height += 1
# Bury the block 100 deep so the coinbase output is spendable
for _ in range(COINBASE_MATURITY):
block = create_block(self.tip, create_coinbase(height), self.block_time)
block.solve()
self.blocks.append(block)
self.tip = block.sha256
self.block_time += 1
height += 1
# Create a transaction spending the coinbase output with an invalid
# (null) signature
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(self.block1.vtx[0].sha256, 0), scriptSig=b""))
tx.vout.append(CTxOut(49 * 100000000, CScript([OP_TRUE])))
pad_tx(tx)
tx.calc_sha256()
block102 = create_block(self.tip, create_coinbase(height), self.block_time)
self.block_time += 1
block102.vtx.extend([tx])
block102.hashMerkleRoot = block102.calc_merkle_root()
- block102.rehash()
block102.solve()
self.blocks.append(block102)
self.tip = block102.sha256
self.block_time += 1
height += 1
# Bury the assumed valid block 2100 deep
for _ in range(2100):
block = create_block(self.tip, create_coinbase(height), self.block_time)
block.nVersion = 4
block.solve()
self.blocks.append(block)
self.tip = block.sha256
self.block_time += 1
height += 1
# Start node1 and node2 with assumevalid so they accept a block with a
# bad signature.
self.start_node(1, extra_args=[f"-assumevalid={hex(block102.sha256)}"])
self.start_node(2, extra_args=[f"-assumevalid={hex(block102.sha256)}"])
p2p0 = self.nodes[0].add_p2p_connection(BaseNode())
p2p0.send_header_for_blocks(self.blocks[0:2000])
p2p0.send_header_for_blocks(self.blocks[2000:])
# Send blocks to node0. Block 102 will be rejected.
self.send_blocks_until_disconnected(p2p0)
self.wait_until(lambda: self.nodes[0].getblockcount() >= COINBASE_MATURITY + 1)
assert_equal(self.nodes[0].getblockcount(), COINBASE_MATURITY + 1)
p2p1 = self.nodes[1].add_p2p_connection(BaseNode())
p2p1.send_header_for_blocks(self.blocks[0:2000])
p2p1.send_header_for_blocks(self.blocks[2000:])
# Send all blocks to node1. All blocks will be accepted.
for i in range(2202):
p2p1.send_message(msg_block(self.blocks[i]))
# Syncing 2200 blocks can take a while on slow systems. Give it plenty
# of time to sync.
p2p1.sync_with_ping(960)
assert_equal(
self.nodes[1].getblock(self.nodes[1].getbestblockhash())["height"], 2202
)
p2p2 = self.nodes[2].add_p2p_connection(BaseNode())
p2p2.send_header_for_blocks(self.blocks[0:200])
# Send blocks to node2. Block 102 will be rejected.
self.send_blocks_until_disconnected(p2p2)
self.wait_until(lambda: self.nodes[2].getblockcount() >= COINBASE_MATURITY + 1)
assert_equal(self.nodes[2].getblockcount(), COINBASE_MATURITY + 1)
if __name__ == "__main__":
AssumeValidTest().main()
diff --git a/test/functional/feature_bip68_sequence.py b/test/functional/feature_bip68_sequence.py
index 254c89ca5..9bb54f398 100644
--- a/test/functional/feature_bip68_sequence.py
+++ b/test/functional/feature_bip68_sequence.py
@@ -1,534 +1,532 @@
# Copyright (c) 2014-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test BIP68 implementation."""
import time
from test_framework.blocktools import create_block
from test_framework.messages import (
XEC,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
FromHex,
ToHex,
)
from test_framework.script import CScript
from test_framework.test_framework import BitcoinTestFramework
from test_framework.txtools import pad_tx
from test_framework.util import (
assert_equal,
assert_greater_than,
assert_raises_rpc_error,
satoshi_round,
)
from test_framework.wallet import MiniWallet
SEQUENCE_LOCKTIME_DISABLE_FLAG = 1 << 31
# this means use time (0 means height)
SEQUENCE_LOCKTIME_TYPE_FLAG = 1 << 22
# this is a bit-shift
SEQUENCE_LOCKTIME_GRANULARITY = 9
SEQUENCE_LOCKTIME_MASK = 0x0000FFFF
# RPC error for non-BIP68 final transactions
NOT_FINAL_ERROR = "non-BIP68-final"
class BIP68Test(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.extra_args = [
[
"-noparkdeepreorg",
"-acceptnonstdtxn=1",
],
[
"-acceptnonstdtxn=0",
"-automaticunparking=1",
],
]
def run_test(self):
self.relayfee = self.nodes[0].getnetworkinfo()["relayfee"]
self.wallet = MiniWallet(self.nodes[0])
self.log.info("Running test disable flag")
self.test_disable_flag()
self.log.info("Running test sequence-lock-confirmed-inputs")
self.test_sequence_lock_confirmed_inputs()
self.log.info("Running test sequence-lock-unconfirmed-inputs")
self.test_sequence_lock_unconfirmed_inputs()
self.log.info("Running test BIP68 not consensus before versionbits activation")
self.test_bip68_not_consensus()
self.log.info("Activating BIP68 (and 112/113)")
self.activateCSV()
print("Verifying nVersion=2 transactions are standard.")
print(
"Note that with current versions of bitcoin software, nVersion=2"
" transactions are always standard (independent of BIP68 activation"
" status)."
)
self.test_version2_relay()
self.log.info("Passed")
# Test that BIP68 is not in effect if tx version is 1, or if
# the first sequence bit is set.
def test_disable_flag(self):
utxo = self.wallet.send_self_transfer(from_node=self.nodes[0])["new_utxo"]
tx1 = CTransaction()
value = int(satoshi_round(utxo["value"] - self.relayfee) * XEC)
# Check that the disable flag disables relative locktime.
# If sequence locks were used, this would require 1 block for the
# input to mature.
sequence_value = SEQUENCE_LOCKTIME_DISABLE_FLAG | 1
tx1.vin = [
CTxIn(
COutPoint(int(utxo["txid"], 16), utxo["vout"]), nSequence=sequence_value
)
]
tx1.vout = [CTxOut(value, CScript([b"a"]))]
pad_tx(tx1)
self.wallet.sign_tx(tx=tx1)
tx1_id = self.wallet.sendrawtransaction(
from_node=self.nodes[0], tx_hex=tx1.serialize().hex()
)
tx1_id = int(tx1_id, 16)
# This transaction will enable sequence-locks, so this transaction should
# fail
tx2 = CTransaction()
tx2.nVersion = 2
sequence_value = sequence_value & 0x7FFFFFFF
tx2.vin = [CTxIn(COutPoint(tx1_id, 0), nSequence=sequence_value)]
tx2.vout = [CTxOut(int(value - self.relayfee * XEC), CScript([b"a"]))]
pad_tx(tx2)
tx2.rehash()
assert_raises_rpc_error(
-26,
NOT_FINAL_ERROR,
self.wallet.sendrawtransaction,
from_node=self.nodes[0],
tx_hex=tx2.serialize().hex(),
)
# Setting the version back down to 1 should disable the sequence lock,
# so this should be accepted.
tx2.nVersion = 1
self.wallet.sendrawtransaction(
from_node=self.nodes[0], tx_hex=tx2.serialize().hex()
)
# Calculate the median time past of a prior block ("confirmations" before
# the current tip).
def get_median_time_past(self, confirmations):
block_hash = self.nodes[0].getblockhash(
self.nodes[0].getblockcount() - confirmations
)
return self.nodes[0].getblockheader(block_hash)["mediantime"]
# Test that sequence locks are respected for transactions spending
# confirmed inputs.
def test_sequence_lock_confirmed_inputs(self):
# Create lots of confirmed utxos, and use them to generate lots of random
# transactions.
max_outputs = 50
while (
len(
self.wallet.get_utxos(
include_immature_coinbase=False, mark_as_spent=False
)
)
< 200
):
import random
num_outputs = random.randint(1, max_outputs)
self.wallet.send_self_transfer_multi(
from_node=self.nodes[0], num_outputs=num_outputs
)
self.generate(self.wallet, 1)
utxos = self.wallet.get_utxos(include_immature_coinbase=False)
# Try creating a lot of random transactions.
# Each time, choose a random number of inputs, and randomly set
# some of those inputs to be sequence locked (and randomly choose
# between height/time locking). Small random chance of making the locks
# all pass.
for _ in range(400):
# Randomly choose up to 10 inputs
num_inputs = random.randint(1, 10)
random.shuffle(utxos)
# Track whether any sequence locks used should fail
should_pass = True
# Track whether this transaction was built with sequence locks
using_sequence_locks = False
tx = CTransaction()
tx.nVersion = 2
value = 0
for j in range(num_inputs):
# this disables sequence locks
sequence_value = 0xFFFFFFFE
# 50% chance we enable sequence locks
if random.randint(0, 1):
using_sequence_locks = True
# 10% of the time, make the input sequence value pass
input_will_pass = random.randint(1, 10) == 1
sequence_value = utxos[j]["confirmations"]
if not input_will_pass:
sequence_value += 1
should_pass = False
# Figure out what the median-time-past was for the confirmed input
# Note that if an input has N confirmations, we're going back N blocks
# from the tip so that we're looking up MTP of the block
# PRIOR to the one the input appears in, as per the BIP68
# spec.
orig_time = self.get_median_time_past(utxos[j]["confirmations"])
# MTP of the tip
cur_time = self.get_median_time_past(0)
# can only timelock this input if it's not too old --
# otherwise use height
can_time_lock = True
if (
(cur_time - orig_time) >> SEQUENCE_LOCKTIME_GRANULARITY
) >= SEQUENCE_LOCKTIME_MASK:
can_time_lock = False
# if time-lockable, then 50% chance we make this a time
# lock
if random.randint(0, 1) and can_time_lock:
# Find first time-lock value that fails, or latest one
# that succeeds
time_delta = sequence_value << SEQUENCE_LOCKTIME_GRANULARITY
if input_will_pass and time_delta > cur_time - orig_time:
sequence_value = (
cur_time - orig_time
) >> SEQUENCE_LOCKTIME_GRANULARITY
elif not input_will_pass and time_delta <= cur_time - orig_time:
sequence_value = (
(cur_time - orig_time) >> SEQUENCE_LOCKTIME_GRANULARITY
) + 1
sequence_value |= SEQUENCE_LOCKTIME_TYPE_FLAG
tx.vin.append(
CTxIn(
COutPoint(int(utxos[j]["txid"], 16), utxos[j]["vout"]),
nSequence=sequence_value,
)
)
value += utxos[j]["value"] * XEC
# Overestimate the size of the tx - signatures should be less than
# 120 bytes, and leave 50 for the output
tx_size = len(ToHex(tx)) // 2 + 120 * num_inputs + 50
tx.vout.append(
CTxOut(
int(value - self.relayfee * tx_size * XEC / 1000), CScript([b"a"])
)
)
self.wallet.sign_tx(tx=tx)
if using_sequence_locks and not should_pass:
# This transaction should be rejected
assert_raises_rpc_error(
-26,
NOT_FINAL_ERROR,
self.wallet.sendrawtransaction,
from_node=self.nodes[0],
tx_hex=tx.serialize().hex(),
)
else:
# This raw transaction should be accepted
self.wallet.sendrawtransaction(
from_node=self.nodes[0], tx_hex=tx.serialize().hex()
)
self.wallet.rescan_utxos()
utxos = self.wallet.get_utxos(include_immature_coinbase=False)
# Test that sequence locks on unconfirmed inputs must have nSequence
# height or time of 0 to be accepted.
# Then test that BIP68-invalid transactions are removed from the mempool
# after a reorg.
def test_sequence_lock_unconfirmed_inputs(self):
# Store height so we can easily reset the chain at the end of the test
cur_height = self.nodes[0].getblockcount()
# Create a mempool tx.
self.wallet.rescan_utxos()
tx1 = self.wallet.send_self_transfer(from_node=self.nodes[0])["tx"]
tx1.rehash()
# As the fees are calculated prior to the transaction being signed,
# there is some uncertainty that calculate fee provides the correct
# minimal fee. Since regtest coins are free, let's go ahead and
# increase the fee by an order of magnitude to ensure this test
# passes.
fee_multiplier = 10
# Anyone-can-spend mempool tx.
# Sequence lock of 0 should pass.
tx2 = CTransaction()
tx2.nVersion = 2
tx2.vin = [CTxIn(COutPoint(tx1.sha256, 0), nSequence=0)]
tx2.vout = [CTxOut(int(0), CScript([b"a"]))]
tx2.vout[0].nValue = tx1.vout[0].nValue - fee_multiplier * self.nodes[
0
].calculate_fee(tx2)
self.wallet.sign_tx(tx=tx2)
tx2_raw = tx2.serialize().hex()
tx2.rehash()
self.wallet.sendrawtransaction(from_node=self.nodes[0], tx_hex=tx2_raw)
# Create a spend of the 0th output of orig_tx with a sequence lock
# of 1, and test what happens when submitting.
# orig_tx.vout[0] must be an anyone-can-spend output
def test_nonzero_locks(orig_tx, node, use_height_lock):
sequence_value = 1
if not use_height_lock:
sequence_value |= SEQUENCE_LOCKTIME_TYPE_FLAG
tx = CTransaction()
tx.nVersion = 2
tx.vin = [CTxIn(COutPoint(orig_tx.sha256, 0), nSequence=sequence_value)]
tx.vout = [
CTxOut(
int(
orig_tx.vout[0].nValue - fee_multiplier * node.calculate_fee(tx)
),
CScript([b"a"]),
)
]
pad_tx(tx)
tx.rehash()
if orig_tx.hash in node.getrawmempool():
# sendrawtransaction should fail if the tx is in the mempool
assert_raises_rpc_error(
-26,
NOT_FINAL_ERROR,
self.wallet.sendrawtransaction,
from_node=node,
tx_hex=tx.serialize().hex(),
)
else:
# sendrawtransaction should succeed if the tx is not in the
# mempool
self.wallet.sendrawtransaction(
from_node=node, tx_hex=tx.serialize().hex()
)
return tx
test_nonzero_locks(tx2, self.nodes[0], use_height_lock=True)
test_nonzero_locks(tx2, self.nodes[0], use_height_lock=False)
# Now mine some blocks, but make sure tx2 doesn't get mined.
# Use prioritisetransaction to lower the effective feerate to 0
self.nodes[0].prioritisetransaction(
txid=tx2.hash, fee_delta=-fee_multiplier * self.nodes[0].calculate_fee(tx2)
)
cur_time = int(time.time())
for _ in range(10):
self.nodes[0].setmocktime(cur_time + 600)
self.generate(self.wallet, 1, sync_fun=self.no_op)
cur_time += 600
assert tx2.hash in self.nodes[0].getrawmempool()
test_nonzero_locks(tx2, self.nodes[0], use_height_lock=True)
test_nonzero_locks(tx2, self.nodes[0], use_height_lock=False)
# Mine tx2, and then try again
self.nodes[0].prioritisetransaction(
txid=tx2.hash, fee_delta=fee_multiplier * self.nodes[0].calculate_fee(tx2)
)
# Advance the time on the node so that we can test timelocks
self.nodes[0].setmocktime(cur_time + 600)
# Save block template now to use for the reorg later
tmpl = self.nodes[0].getblocktemplate()
self.generate(self.nodes[0], 1)
assert tx2.hash not in self.nodes[0].getrawmempool()
# Now that tx2 is not in the mempool, a sequence locked spend should
# succeed
tx3 = test_nonzero_locks(tx2, self.nodes[0], use_height_lock=False)
assert tx3.hash in self.nodes[0].getrawmempool()
self.generate(self.nodes[0], 1)
assert tx3.hash not in self.nodes[0].getrawmempool()
# One more test, this time using height locks
tx4 = test_nonzero_locks(tx3, self.nodes[0], use_height_lock=True)
assert tx4.hash in self.nodes[0].getrawmempool()
# Now try combining confirmed and unconfirmed inputs
tx5 = test_nonzero_locks(tx4, self.nodes[0], use_height_lock=True)
assert tx5.hash not in self.nodes[0].getrawmempool()
utxo = self.wallet.get_utxo()
tx5.vin.append(
CTxIn(COutPoint(int(utxo["txid"], 16), utxo["vout"]), nSequence=1)
)
tx5.vout[0].nValue += int(utxo["value"] * XEC)
self.wallet.sign_tx(tx=tx5)
assert_raises_rpc_error(
-26,
NOT_FINAL_ERROR,
self.wallet.sendrawtransaction,
from_node=self.nodes[0],
tx_hex=tx5.serialize().hex(),
)
# Test mempool-BIP68 consistency after reorg
#
# State of the transactions in the last blocks:
# ... -> [ tx2 ] -> [ tx3 ]
# tip-1 tip
# And currently tx4 is in the mempool.
#
# If we invalidate the tip, tx3 should get added to the mempool, causing
# tx4 to be removed (fails sequence-lock).
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
assert tx4.hash not in self.nodes[0].getrawmempool()
assert tx3.hash in self.nodes[0].getrawmempool()
# Now mine 2 empty blocks to reorg out the current tip (labeled tip-1 in
# diagram above).
# This would cause tx2 to be added back to the mempool, which in turn causes
# tx3 to be removed.
for i in range(2):
block = create_block(tmpl=tmpl, ntime=cur_time)
- block.rehash()
block.solve()
tip = block.sha256
assert_equal(
None if i == 1 else "inconclusive",
self.nodes[0].submitblock(ToHex(block)),
)
tmpl = self.nodes[0].getblocktemplate()
tmpl["previousblockhash"] = f"{tip:x}"
tmpl["transactions"] = []
cur_time += 1
mempool = self.nodes[0].getrawmempool()
assert tx3.hash not in mempool
assert tx2.hash in mempool
# Reset the chain and get rid of the mocktimed-blocks
self.nodes[0].setmocktime(0)
self.nodes[0].invalidateblock(self.nodes[0].getblockhash(cur_height + 1))
self.generate(self.wallet, 10, sync_fun=self.no_op)
def get_csv_status(self):
height = self.nodes[0].getblockchaininfo()["blocks"]
return height >= 576
# Make sure that BIP68 isn't being used to validate blocks, prior to
# versionbits activation. If more blocks are mined prior to this test
# being run, then it's possible the test has activated the soft fork, and
# this test should be moved to run earlier, or deleted.
def test_bip68_not_consensus(self):
assert_equal(self.get_csv_status(), False)
tx1 = self.wallet.send_self_transfer(from_node=self.nodes[0])["tx"]
tx1.rehash()
# Make an anyone-can-spend transaction
tx2 = CTransaction()
tx2.nVersion = 1
tx2.vin = [CTxIn(COutPoint(tx1.sha256, 0), nSequence=0)]
tx2.vout = [
CTxOut(int(tx1.vout[0].nValue - self.relayfee * XEC), CScript([b"a"]))
]
# sign tx2
self.wallet.sign_tx(tx=tx2)
tx2_raw = tx2.serialize().hex()
tx2 = FromHex(tx2, tx2_raw)
pad_tx(tx2)
tx2.rehash()
self.wallet.sendrawtransaction(from_node=self.nodes[0], tx_hex=tx2_raw)
# Now make an invalid spend of tx2 according to BIP68
# 100 block relative locktime
sequence_value = 100
tx3 = CTransaction()
tx3.nVersion = 2
tx3.vin = [CTxIn(COutPoint(tx2.sha256, 0), nSequence=sequence_value)]
tx3.vout = [
CTxOut(int(tx2.vout[0].nValue - self.relayfee * XEC), CScript([b"a"]))
]
pad_tx(tx3)
tx3.rehash()
assert_raises_rpc_error(
-26,
NOT_FINAL_ERROR,
self.wallet.sendrawtransaction,
from_node=self.nodes[0],
tx_hex=tx3.serialize().hex(),
)
# make a block that violates bip68; ensure that the tip updates
block = create_block(tmpl=self.nodes[0].getblocktemplate())
block.vtx.extend(sorted([tx1, tx2, tx3], key=lambda tx: tx.get_id()))
block.hashMerkleRoot = block.calc_merkle_root()
- block.rehash()
block.solve()
assert_equal(None, self.nodes[0].submitblock(ToHex(block)))
assert_equal(self.nodes[0].getbestblockhash(), block.hash)
def activateCSV(self):
# activation should happen at block height 576
csv_activation_height = 576
height = self.nodes[0].getblockcount()
assert_greater_than(csv_activation_height - height, 1)
self.generate(
self.wallet, csv_activation_height - height - 1, sync_fun=self.no_op
)
assert_equal(self.get_csv_status(), False)
self.disconnect_nodes(0, 1)
self.generate(self.wallet, 1, sync_fun=self.no_op)
assert_equal(self.get_csv_status(), True)
# We have a block that has CSV activated, but we want to be at
# the activation point, so we invalidate the tip.
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
self.connect_nodes(0, 1)
self.sync_blocks()
# Use self.nodes[1] to test that version 2 transactions are standard.
def test_version2_relay(self):
mini_wallet = MiniWallet(self.nodes[1])
mini_wallet.rescan_utxos()
tx = mini_wallet.create_self_transfer()["tx"]
tx.nVersion = 2
mini_wallet.sendrawtransaction(
from_node=self.nodes[1], tx_hex=tx.serialize().hex()
)
if __name__ == "__main__":
BIP68Test().main()
diff --git a/test/functional/feature_block.py b/test/functional/feature_block.py
index 2ce661a2f..713caca90 100644
--- a/test/functional/feature_block.py
+++ b/test/functional/feature_block.py
@@ -1,1362 +1,1361 @@
# Copyright (c) 2015-2017 The Bitcoin Core developers
# Copyright (c) 2019 The Bitcoin developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test block processing."""
import copy
import struct
import time
from data import invalid_txs
from test_framework.blocktools import (
create_block,
create_coinbase,
create_tx_with_script,
make_conform_to_ctor,
)
from test_framework.cdefs import LEGACY_MAX_BLOCK_SIZE
from test_framework.key import ECKey
from test_framework.messages import (
COIN,
CBlock,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
uint256_from_compact,
uint256_from_str,
)
from test_framework.p2p import P2PDataStore
from test_framework.script import (
OP_ELSE,
OP_ENDIF,
OP_FALSE,
OP_IF,
OP_INVALIDOPCODE,
OP_RETURN,
OP_TRUE,
CScript,
)
from test_framework.signature_hash import (
SIGHASH_ALL,
SIGHASH_FORKID,
SignatureHashForkId,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.txtools import pad_tx
from test_framework.util import assert_equal
# Use this class for tests that require behavior other than normal p2p behavior.
# For now, it is used to serialize a bloated varint (b64).
class CBrokenBlock(CBlock):
def initialize(self, base_block):
self.vtx = copy.deepcopy(base_block.vtx)
self.hashMerkleRoot = self.calc_merkle_root()
def serialize(self):
r = b""
r += super(CBlock, self).serialize()
r += struct.pack("<BQ", 255, len(self.vtx))
for tx in self.vtx:
r += tx.serialize()
return r
def normal_serialize(self):
return super().serialize()
# Valid for block at height 120
DUPLICATE_COINBASE_SCRIPT_SIG = b"\x01\x78"
class FullBlockTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
# This is a consensus block test, we don't care about tx policy
self.extra_args = [["-noparkdeepreorg", "-acceptnonstdtxn=1"]]
def run_test(self):
node = self.nodes[0] # convenience reference to the node
self.bootstrap_p2p() # Add one p2p connection to the node
self.block_heights = {}
self.coinbase_key = ECKey()
self.coinbase_key.generate()
self.coinbase_pubkey = self.coinbase_key.get_pubkey().get_bytes()
self.tip = None
self.blocks = {}
self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
self.block_heights[self.genesis_hash] = 0
self.spendable_outputs = []
# Create a new block
b_dup_cb = self.next_block("dup_cb")
b_dup_cb.vtx[0].vin[0].scriptSig = DUPLICATE_COINBASE_SCRIPT_SIG
b_dup_cb.vtx[0].rehash()
duplicate_tx = b_dup_cb.vtx[0]
b_dup_cb = self.update_block("dup_cb", [])
self.send_blocks([b_dup_cb])
b0 = self.next_block(0)
self.save_spendable_output()
self.send_blocks([b0])
# These constants chosen specifically to trigger an immature coinbase spend
# at a certain time below.
NUM_BUFFER_BLOCKS_TO_GENERATE = 99
NUM_OUTPUTS_TO_COLLECT = 33
# Allow the block to mature
blocks = []
for i in range(NUM_BUFFER_BLOCKS_TO_GENERATE):
blocks.append(self.next_block(f"maturitybuffer.{i}"))
self.save_spendable_output()
self.send_blocks(blocks)
# collect spendable outputs now to avoid cluttering the code later on
out = []
for _ in range(NUM_OUTPUTS_TO_COLLECT):
out.append(self.get_spendable_output())
# Start by building a couple of blocks on top (which output is spent is
# in parentheses):
# genesis -> b1 (0) -> b2 (1)
b1 = self.next_block(1, spend=out[0])
self.save_spendable_output()
b2 = self.next_block(2, spend=out[1])
self.save_spendable_output()
self.send_blocks([b1, b2])
# Select a txn with an output eligible for spending. This won't actually be spent,
# since we're testing submission of a series of blocks with invalid
# txns.
attempt_spend_tx = out[2]
# Submit blocks for rejection, each of which contains a single transaction
# (aside from coinbase) which should be considered invalid.
for TxTemplate in invalid_txs.iter_all_templates():
template = TxTemplate(spend_tx=attempt_spend_tx)
if template.valid_in_block:
continue
self.log.info("Reject block with invalid tx: %s", TxTemplate.__name__)
blockname = f"for_invalid.{TxTemplate.__name__}"
badblock = self.next_block(blockname)
badtx = template.get_tx()
if TxTemplate != invalid_txs.InputMissing:
self.sign_tx(badtx, attempt_spend_tx)
badtx.rehash()
badblock = self.update_block(blockname, [badtx])
self.send_blocks(
[badblock],
success=False,
reject_reason=(template.block_reject_reason or template.reject_reason),
reconnect=True,
)
self.move_tip(2)
# Fork like this:
#
# genesis -> b1 (0) -> b2 (1)
# \-> b3 (1)
#
# Nothing should happen at this point. We saw b2 first so it takes
# priority.
self.log.info("Don't reorg to a chain of the same length")
self.move_tip(1)
b3 = self.next_block(3, spend=out[1])
txout_b3 = b3.vtx[1]
self.send_blocks([b3], False)
# Now we add another block to make the alternative chain longer.
#
# genesis -> b1 (0) -> b2 (1)
# \-> b3 (1) -> b4 (2)
self.log.info("Reorg to a longer chain")
b4 = self.next_block(4, spend=out[2])
self.send_blocks([b4])
# ... and back to the first chain.
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b3 (1) -> b4 (2)
self.move_tip(2)
b5 = self.next_block(5, spend=out[2])
self.save_spendable_output()
self.send_blocks([b5], False)
self.log.info("Reorg back to the original chain")
b6 = self.next_block(6, spend=out[3])
self.send_blocks([b6], True)
# Try to create a fork that double-spends
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b7 (2) -> b8 (4)
# \-> b3 (1) -> b4 (2)
self.log.info("Reject a chain with a double spend, even if it is longer")
self.move_tip(5)
b7 = self.next_block(7, spend=out[2])
self.send_blocks([b7], False)
b8 = self.next_block(8, spend=out[4])
self.send_blocks([b8], False, reconnect=True)
# Try to create a block that has too much fee
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b9 (4)
# \-> b3 (1) -> b4 (2)
self.log.info("Reject a block where the miner creates too much coinbase reward")
self.move_tip(6)
b9 = self.next_block(9, spend=out[4], additional_coinbase_value=1)
self.send_blocks(
[b9], success=False, reject_reason="bad-cb-amount", reconnect=True
)
# Create a fork that ends in a block with too much fee (the one that causes the reorg)
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b10 (3) -> b11 (4)
# \-> b3 (1) -> b4 (2)
self.log.info(
"Reject a chain where the miner creates too much coinbase reward, even if"
" the chain is longer"
)
self.move_tip(5)
b10 = self.next_block(10, spend=out[3])
self.send_blocks([b10], False)
b11 = self.next_block(11, spend=out[4], additional_coinbase_value=1)
self.send_blocks(
[b11], success=False, reject_reason="bad-cb-amount", reconnect=True
)
# Try again, but with a valid fork first
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b12 (3) -> b13 (4) -> b14 (5)
# \-> b3 (1) -> b4 (2)
self.log.info(
"Reject a chain where the miner creates too much coinbase reward, even if"
" the chain is longer (on a forked chain)"
)
self.move_tip(5)
b12 = self.next_block(12, spend=out[3])
self.save_spendable_output()
b13 = self.next_block(13, spend=out[4])
self.save_spendable_output()
b14 = self.next_block(14, spend=out[5], additional_coinbase_value=1)
self.send_blocks(
[b12, b13, b14],
success=False,
reject_reason="bad-cb-amount",
reconnect=True,
)
# New tip should be b13.
assert_equal(node.getbestblockhash(), b13.hash)
self.move_tip(13)
b15 = self.next_block(15)
self.save_spendable_output()
self.send_blocks([b15], True)
# Attempt to spend a transaction created on a different fork
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b12 (3) -> b13 (4) -> b15 (5) -> b17 (b3.vtx[1])
# \-> b3 (1) -> b4 (2)
self.log.info("Reject a block with a spend from a re-org'ed out tx")
self.move_tip(15)
b17 = self.next_block(17, spend=txout_b3)
self.send_blocks(
[b17],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
# Attempt to spend a transaction created on a different fork (on a fork this time)
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b12 (3) -> b13 (4) -> b15 (5)
# \-> b18 (b3.vtx[1]) -> b19 (6)
# \-> b3 (1) -> b4 (2)
self.log.info(
"Reject a block with a spend from a re-org'ed out tx (on a forked chain)"
)
self.move_tip(13)
b18 = self.next_block(18, spend=txout_b3)
self.send_blocks([b18], False)
b19 = self.next_block(19, spend=out[6])
self.send_blocks(
[b19],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
# Attempt to spend a coinbase at depth too low
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b12 (3) -> b13 (4) -> b15 (5) -> b20 (7)
# \-> b3 (1) -> b4 (2)
self.log.info("Reject a block spending an immature coinbase.")
self.move_tip(15)
b20 = self.next_block(20, spend=out[7])
self.send_blocks(
[b20],
success=False,
reject_reason="bad-txns-premature-spend-of-coinbase",
reconnect=True,
)
# Attempt to spend a coinbase at depth too low (on a fork this time)
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b12 (3) -> b13 (4) -> b15 (5)
# \-> b21 (6) -> b22 (5)
# \-> b3 (1) -> b4 (2)
self.log.info(
"Reject a block spending an immature coinbase (on a forked chain)"
)
self.move_tip(13)
b21 = self.next_block(21, spend=out[6])
self.send_blocks([b21], False)
b22 = self.next_block(22, spend=out[5])
self.send_blocks(
[b22],
success=False,
reject_reason="bad-txns-premature-spend-of-coinbase",
reconnect=True,
)
# Create a block on either side of LEGACY_MAX_BLOCK_SIZE and make sure its accepted/rejected
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b12 (3) -> b13 (4) -> b15 (5) -> b23 (6)
# \-> b24 (6) -> b25 (7)
# \-> b3 (1) -> b4 (2)
self.log.info("Accept a block of size LEGACY_MAX_BLOCK_SIZE")
self.move_tip(15)
b23 = self.next_block(23, spend=out[6])
tx = CTransaction()
script_length = LEGACY_MAX_BLOCK_SIZE - len(b23.serialize()) - 69
script_output = CScript([b"\x00" * script_length])
tx.vout.append(CTxOut(0, script_output))
tx.vin.append(CTxIn(COutPoint(b23.vtx[1].sha256, 0)))
b23 = self.update_block(23, [tx])
# Make sure the math above worked out to produce a max-sized block
assert_equal(len(b23.serialize()), LEGACY_MAX_BLOCK_SIZE)
self.send_blocks([b23], True)
self.save_spendable_output()
# Create blocks with a coinbase input script size out of range
# genesis -> b1 (0) -> b2 (1) -> b5 (2) -> b6 (3)
# \-> b12 (3) -> b13 (4) -> b15 (5) -> b23 (6) -> b30 (7)
# \-> ... (6) -> ... (7)
# \-> b3 (1) -> b4 (2)
self.log.info("Reject a block with coinbase input script size out of range")
self.move_tip(15)
b26 = self.next_block(26, spend=out[6])
b26.vtx[0].vin[0].scriptSig = b"\x00"
b26.vtx[0].rehash()
# update_block causes the merkle root to get updated, even with no new
# transactions, and updates the required state.
b26 = self.update_block(26, [])
self.send_blocks(
[b26], success=False, reject_reason="bad-cb-length", reconnect=True
)
# Extend the b26 chain to make sure bitcoind isn't accepting b26
b27 = self.next_block(27, spend=out[7])
self.send_blocks([b27], False)
# Now try a too-large-coinbase script
self.move_tip(15)
b28 = self.next_block(28, spend=out[6])
b28.vtx[0].vin[0].scriptSig = b"\x00" * 101
b28.vtx[0].rehash()
b28 = self.update_block(28, [])
self.send_blocks(
[b28], success=False, reject_reason="bad-cb-length", reconnect=True
)
# Extend the b28 chain to make sure bitcoind isn't accepting b28
b29 = self.next_block(29, spend=out[7])
self.send_blocks([b29], False)
# b30 has a max-sized coinbase scriptSig.
self.move_tip(23)
b30 = self.next_block(30)
b30.vtx[0].vin[0].scriptSig = b"\x00" * 100
b30.vtx[0].rehash()
b30 = self.update_block(30, [])
self.send_blocks([b30], True)
self.save_spendable_output()
b31 = self.next_block(31)
self.save_spendable_output()
b33 = self.next_block(33)
self.save_spendable_output()
b35 = self.next_block(35)
self.save_spendable_output()
self.send_blocks([b31, b33, b35], True)
# Check spending of a transaction in a block which failed to connect
#
# b6 (3)
# b12 (3) -> b13 (4) -> b15 (5) -> b23 (6) -> b30 (7) -> b31 (8) -> b33 (9) -> b35 (10)
# \-> b37 (11)
# \-> b38 (11/37)
#
# save 37's spendable output, but then double-spend out11 to invalidate
# the block
self.log.info(
"Reject a block spending transaction from a block which failed to connect"
)
self.move_tip(35)
b37 = self.next_block(37, spend=out[11])
txout_b37 = b37.vtx[1]
tx = self.create_and_sign_transaction(out[11], 0)
b37 = self.update_block(37, [tx])
self.send_blocks(
[b37],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
# attempt to spend b37's first non-coinbase tx, at which point b37 was
# still considered valid
self.move_tip(35)
b38 = self.next_block(38, spend=txout_b37)
self.send_blocks(
[b38],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
self.move_tip(35)
b39 = self.next_block(39)
self.save_spendable_output()
b41 = self.next_block(41)
self.send_blocks([b39, b41], True)
# Fork off of b39 to create a constant base again
#
# b23 (6) -> b30 (7) -> b31 (8) -> b33 (9) -> b35 (10) -> b39 (11) -> b42 (12) -> b43 (13)
# \-> b41 (12)
#
self.move_tip(39)
b42 = self.next_block(42, spend=out[12])
self.save_spendable_output()
b43 = self.next_block(43, spend=out[13])
self.save_spendable_output()
self.send_blocks([b42, b43], True)
# Test a number of really invalid scenarios
#
# -> b31 (8) -> b33 (9) -> b35 (10) -> b39 (11) -> b42 (12) -> b43 (13) -> b44 (14)
# \-> ??? (15)
# The next few blocks are going to be created "by hand" since they'll do funky things, such as having
# the first transaction be non-coinbase, etc. The purpose of b44 is to
# make sure this works.
self.log.info("Build block 44 manually")
height = self.block_heights[self.tip.sha256] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
b44 = CBlock()
b44.nTime = self.tip.nTime + 1
b44.hashPrevBlock = self.tip.sha256
b44.nBits = 0x207FFFFF
b44.vtx.append(coinbase)
b44.hashMerkleRoot = b44.calc_merkle_root()
b44.solve()
self.tip = b44
self.block_heights[b44.sha256] = height
self.blocks[44] = b44
self.send_blocks([b44], True)
self.log.info("Reject a block with a non-coinbase as the first tx")
non_coinbase = self.create_tx(out[15], 0, 1)
b45 = CBlock()
b45.nTime = self.tip.nTime + 1
b45.hashPrevBlock = self.tip.sha256
b45.nBits = 0x207FFFFF
b45.vtx.append(non_coinbase)
b45.hashMerkleRoot = b45.calc_merkle_root()
- b45.calc_sha256()
b45.solve()
self.block_heights[b45.sha256] = self.block_heights[self.tip.sha256] + 1
self.tip = b45
self.blocks[45] = b45
self.send_blocks(
[b45], success=False, reject_reason="bad-cb-missing", reconnect=True
)
self.log.info("Reject a block with no transactions")
self.move_tip(44)
b46 = CBlock()
b46.nTime = b44.nTime + 1
b46.hashPrevBlock = b44.sha256
b46.nBits = 0x207FFFFF
b46.vtx = []
b46.hashMerkleRoot = 0
b46.solve()
self.block_heights[b46.sha256] = self.block_heights[b44.sha256] + 1
self.tip = b46
assert 46 not in self.blocks
self.blocks[46] = b46
self.send_blocks(
[b46], success=False, reject_reason="bad-cb-missing", reconnect=True
)
self.log.info("Reject a block with invalid work")
self.move_tip(44)
b47 = self.next_block(47)
target = uint256_from_compact(b47.nBits)
while b47.sha256 <= target:
# Rehash nonces until an invalid too-high-hash block is found.
b47.nNonce += 1
b47.rehash()
self.send_blocks(
[b47], False, force_send=True, reject_reason="high-hash", reconnect=True
)
self.log.info("Reject a block with a timestamp >2 hours in the future")
self.move_tip(44)
b48 = self.next_block(48)
b48.nTime = int(time.time()) + 60 * 60 * 3
# Header timestamp has changed. Re-solve the block.
b48.solve()
self.send_blocks([b48], False, force_send=True, reject_reason="time-too-new")
self.log.info("Reject a block with invalid merkle hash")
self.move_tip(44)
b49 = self.next_block(49)
b49.hashMerkleRoot += 1
b49.solve()
self.send_blocks(
[b49], success=False, reject_reason="bad-txnmrklroot", reconnect=True
)
self.log.info("Reject a block with incorrect POW limit")
self.move_tip(44)
b50 = self.next_block(50)
b50.nBits = b50.nBits - 1
b50.solve()
self.send_blocks(
[b50], False, force_send=True, reject_reason="bad-diffbits", reconnect=True
)
self.log.info("Reject a block with two coinbase transactions")
self.move_tip(44)
b51 = self.next_block(51)
cb2 = create_coinbase(51, self.coinbase_pubkey)
b51 = self.update_block(51, [cb2])
self.send_blocks(
[b51], success=False, reject_reason="bad-tx-coinbase", reconnect=True
)
self.log.info("Reject a block with duplicate transactions")
self.move_tip(44)
b52 = self.next_block(52, spend=out[15])
b52 = self.update_block(52, [b52.vtx[1]])
self.send_blocks(
[b52], success=False, reject_reason="tx-duplicate", reconnect=True
)
# Test block timestamps
# -> b31 (8) -> b33 (9) -> b35 (10) -> b39 (11) -> b42 (12) -> b43 (13) -> b53 (14) -> b55 (15)
# \-> b54 (15)
#
self.move_tip(43)
b53 = self.next_block(53, spend=out[14])
self.send_blocks([b53], False)
self.save_spendable_output()
self.log.info("Reject a block with timestamp before MedianTimePast")
b54 = self.next_block(54, spend=out[15])
b54.nTime = b35.nTime - 1
b54.solve()
self.send_blocks(
[b54], False, force_send=True, reject_reason="time-too-old", reconnect=True
)
# valid timestamp
self.move_tip(53)
b55 = self.next_block(55, spend=out[15])
b55.nTime = b35.nTime
self.update_block(55, [])
self.send_blocks([b55], True)
self.save_spendable_output()
# Test Merkle tree malleability
#
# -> b42 (12) -> b43 (13) -> b53 (14) -> b55 (15) -> b57p2 (16)
# \-> b57 (16)
# \-> b56p2 (16)
# \-> b56 (16)
#
# Merkle tree malleability (CVE-2012-2459): repeating sequences of transactions in a block without
# affecting the merkle root of a block, while still invalidating it.
# See: src/consensus/merkle.h
#
# b57 has three txns: coinbase, tx, tx1. The merkle root computation will duplicate tx.
# Result: OK
#
# b56 copies b57 but duplicates tx1 and does not recalculate the block hash. So it has a valid merkle
# root but duplicate transactions.
# Result: Fails
#
# b57p2 has six transactions in its merkle tree:
# - coinbase, tx, tx1, tx2, tx3, tx4
# Merkle root calculation will duplicate as necessary.
# Result: OK.
#
# b56p2 copies b57p2 but adds both tx3 and tx4. The purpose of the test is to make sure the code catches
# duplicate txns that are not next to one another with the "bad-txns-duplicate" error (which indicates
# that the error was caught early, avoiding a DOS vulnerability.)
# b57 - a good block with 2 txs, don't submit until end
self.move_tip(55)
b57 = self.next_block(57)
tx = self.create_and_sign_transaction(out[16], 1)
tx1 = self.create_tx(tx, 0, 1)
b57 = self.update_block(57, [tx, tx1])
# b56 - copy b57, add a duplicate tx
self.log.info(
"Reject a block with a duplicate transaction in the Merkle Tree (but with a"
" valid Merkle Root)"
)
self.move_tip(55)
b56 = copy.deepcopy(b57)
self.blocks[56] = b56
assert_equal(len(b56.vtx), 3)
b56 = self.update_block(56, [b57.vtx[2]])
assert_equal(b56.hash, b57.hash)
self.send_blocks(
[b56], success=False, reject_reason="bad-txns-duplicate", reconnect=True
)
# b57p2 - a good block with 6 tx'es, don't submit until end
self.move_tip(55)
b57p2 = self.next_block("57p2")
tx = self.create_and_sign_transaction(out[16], 1)
tx1 = self.create_tx(tx, 0, 1)
tx2 = self.create_tx(tx1, 0, 1)
tx3 = self.create_tx(tx2, 0, 1)
tx4 = self.create_tx(tx3, 0, 1)
b57p2 = self.update_block("57p2", [tx, tx1, tx2, tx3, tx4])
# b56p2 - copy b57p2, duplicate two non-consecutive tx's
self.log.info(
"Reject a block with two duplicate transactions in the Merkle Tree (but"
" with a valid Merkle Root)"
)
self.move_tip(55)
b56p2 = copy.deepcopy(b57p2)
self.blocks["b56p2"] = b56p2
assert_equal(len(b56p2.vtx), 6)
b56p2 = self.update_block("b56p2", b56p2.vtx[4:6], reorder=False)
assert_equal(b56p2.hash, b57p2.hash)
self.send_blocks(
[b56p2], success=False, reject_reason="bad-txns-duplicate", reconnect=True
)
self.move_tip("57p2")
self.send_blocks([b57p2], True)
self.move_tip(57)
# The tip is not updated because 57p2 seen first
self.send_blocks([b57], False)
self.save_spendable_output()
# Test a few invalid tx types
#
# -> b35 (10) -> b39 (11) -> b42 (12) -> b43 (13) -> b53 (14) -> b55 (15) -> b57 (16) -> b60 ()
# \-> ??? (17)
#
# tx with prevout.n out of range
self.log.info("Reject a block with a transaction with prevout.n out of range")
self.move_tip(57)
b58 = self.next_block(58, spend=out[17])
tx = CTransaction()
assert len(out[17].vout) < 42
tx.vin.append(
CTxIn(COutPoint(out[17].sha256, 42), CScript([OP_TRUE]), 0xFFFFFFFF)
)
tx.vout.append(CTxOut(0, b""))
pad_tx(tx)
tx.calc_sha256()
b58 = self.update_block(58, [tx])
self.send_blocks(
[b58],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
# tx with output value > input value
self.log.info("Reject a block with a transaction with outputs > inputs")
self.move_tip(57)
b59 = self.next_block(59)
tx = self.create_and_sign_transaction(out[17], 51 * COIN)
b59 = self.update_block(59, [tx])
self.send_blocks(
[b59], success=False, reject_reason="bad-txns-in-belowout", reconnect=True
)
# reset to good chain
self.move_tip(57)
b60 = self.next_block(60)
self.send_blocks([b60], True)
self.save_spendable_output()
# Test BIP30 (reject duplicate)
#
# -> b39 (11) -> b42 (12) -> b43 (13) -> b53 (14) -> b55 (15) -> b57 (16) -> b60 ()
# \-> b61 ()
#
# Blocks are not allowed to contain a transaction whose id matches that of an earlier,
# not-fully-spent transaction in the same chain. To test, make identical coinbases;
# the second one should be rejected. See also CVE-2012-1909.
#
self.log.info(
"Reject a block with a transaction with a duplicate hash of a previous"
" transaction (BIP30)"
)
self.move_tip(60)
b61 = self.next_block(61)
b61.vtx[0].vin[0].scriptSig = DUPLICATE_COINBASE_SCRIPT_SIG
b61.vtx[0].rehash()
b61 = self.update_block(61, [])
assert_equal(duplicate_tx.serialize(), b61.vtx[0].serialize())
self.send_blocks(
[b61], success=False, reject_reason="bad-txns-BIP30", reconnect=True
)
# Test BIP30 (allow duplicate if spent)
#
# -> b57 (16) -> b60 ()
# \-> b_spend_dup_cb (b_dup_cb) -> b_dup_2 ()
#
self.move_tip(57)
b_spend_dup_cb = self.next_block("spend_dup_cb")
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(duplicate_tx.sha256, 0)))
tx.vout.append(CTxOut(0, CScript([OP_TRUE])))
self.sign_tx(tx, duplicate_tx)
tx.rehash()
b_spend_dup_cb = self.update_block("spend_dup_cb", [tx])
b_dup_2 = self.next_block("dup_2")
b_dup_2.vtx[0].vin[0].scriptSig = DUPLICATE_COINBASE_SCRIPT_SIG
b_dup_2.vtx[0].rehash()
b_dup_2 = self.update_block("dup_2", [])
assert_equal(duplicate_tx.serialize(), b_dup_2.vtx[0].serialize())
assert_equal(
self.nodes[0].gettxout(txid=duplicate_tx.hash, n=0)["confirmations"], 119
)
self.send_blocks([b_spend_dup_cb, b_dup_2], success=True)
# The duplicate has less confirmations
assert_equal(
self.nodes[0].gettxout(txid=duplicate_tx.hash, n=0)["confirmations"], 1
)
# Test tx.isFinal is properly rejected (not an exhaustive tx.isFinal test, that should be in data-driven transaction tests)
#
# -> b_spend_dup_cb (b_dup_cb) -> b_dup_2 ()
# \-> b62 (18)
#
self.log.info("Reject a block with a transaction with a nonfinal locktime")
self.move_tip("dup_2")
b62 = self.next_block(62)
tx = CTransaction()
tx.nLockTime = 0xFFFFFFFF # this locktime is non-final
# don't set nSequence
tx.vin.append(CTxIn(COutPoint(out[18].sha256, 0)))
tx.vout.append(CTxOut(0, CScript([OP_TRUE])))
assert tx.vin[0].nSequence < 0xFFFFFFFF
tx.calc_sha256()
b62 = self.update_block(62, [tx])
self.send_blocks(
[b62], success=False, reject_reason="bad-txns-nonfinal", reconnect=True
)
# Test a non-final coinbase is also rejected
#
# -> b_spend_dup_cb (b_dup_cb) -> b_dup_2 ()
# \-> b63 (-)
#
self.log.info(
"Reject a block with a coinbase transaction with a nonfinal locktime"
)
self.move_tip("dup_2")
b63 = self.next_block(63)
b63.vtx[0].nLockTime = 0xFFFFFFFF
b63.vtx[0].vin[0].nSequence = 0xDEADBEEF
b63.vtx[0].rehash()
b63 = self.update_block(63, [])
self.send_blocks(
[b63], success=False, reject_reason="bad-txns-nonfinal", reconnect=True
)
# This checks that a block with a bloated VARINT between the block_header and the array of tx such that
# the block is > LEGACY_MAX_BLOCK_SIZE with the bloated varint, but <= LEGACY_MAX_BLOCK_SIZE without the bloated varint,
# does not cause a subsequent, identical block with canonical encoding to be rejected. The test does not
# care whether the bloated block is accepted or rejected; it only cares that the second block is accepted.
#
# What matters is that the receiving node should not reject the bloated block, and then reject the canonical
# block on the basis that it's the same as an already-rejected block (which would be a consensus failure.)
#
# -> b_spend_dup_cb (b_dup_cb) -> b_dup_2 () -> b64 (18)
# \
# b64a (18)
# b64a is a bloated block (non-canonical varint)
# b64 is a good block (same as b64 but w/ canonical varint)
#
self.log.info(
"Accept a valid block even if a bloated version of the block has previously"
" been sent"
)
self.move_tip("dup_2")
regular_block = self.next_block("64a", spend=out[18])
# make it a "broken_block," with non-canonical serialization
b64a = CBrokenBlock(regular_block)
b64a.initialize(regular_block)
self.blocks["64a"] = b64a
self.tip = b64a
tx = CTransaction()
# use canonical serialization to calculate size
script_length = LEGACY_MAX_BLOCK_SIZE - len(b64a.normal_serialize()) - 69
script_output = CScript([b"\x00" * script_length])
tx.vout.append(CTxOut(0, script_output))
tx.vin.append(CTxIn(COutPoint(b64a.vtx[1].sha256, 0)))
b64a = self.update_block("64a", [tx])
assert_equal(len(b64a.serialize()), LEGACY_MAX_BLOCK_SIZE + 8)
self.send_blocks(
[b64a], success=False, reject_reason="non-canonical ReadCompactSize()"
)
# bitcoind doesn't disconnect us for sending a bloated block, but if we subsequently
# resend the header message, it won't send us the getdata message again. Just
# disconnect and reconnect and then call send_blocks.
# TODO: improve this test to be less dependent on P2P DOS behaviour.
node.disconnect_p2ps()
self.reconnect_p2p()
self.move_tip("dup_2")
b64 = CBlock(b64a)
b64.vtx = copy.deepcopy(b64a.vtx)
assert_equal(b64.hash, b64a.hash)
assert_equal(len(b64.serialize()), LEGACY_MAX_BLOCK_SIZE)
self.blocks[64] = b64
b64 = self.update_block(64, [])
self.send_blocks([b64], True)
self.save_spendable_output()
# Spend an output created in the block itself
#
# -> b_dup_2 () -> b64 (18) -> b65 (19)
#
self.log.info(
"Accept a block with a transaction spending an output created in the same"
" block"
)
self.move_tip(64)
b65 = self.next_block(65)
tx1 = self.create_and_sign_transaction(out[19], out[19].vout[0].nValue)
tx2 = self.create_and_sign_transaction(tx1, 0)
b65 = self.update_block(65, [tx1, tx2])
self.send_blocks([b65], True)
self.save_spendable_output()
# Attempt to double-spend a transaction created in a block
#
# -> b64 (18) -> b65 (19)
# \-> b67 (20)
#
#
self.log.info(
"Reject a block with a transaction double spending a transaction created in"
" the same block"
)
self.move_tip(65)
b67 = self.next_block(67)
tx1 = self.create_and_sign_transaction(out[20], out[20].vout[0].nValue)
tx2 = self.create_and_sign_transaction(tx1, 1)
tx3 = self.create_and_sign_transaction(tx1, 2)
b67 = self.update_block(67, [tx1, tx2, tx3])
self.send_blocks(
[b67],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
# More tests of block subsidy
#
# -> b64 (18) -> b65 (19) -> b69 (20)
# \-> b68 (20)
#
# b68 - coinbase with an extra 10 satoshis,
# creates a tx that has 9 satoshis from out[20] go to fees
# this fails because the coinbase is trying to claim 1 satoshi too much in fees
#
# b69 - coinbase with extra 10 satoshis, and a tx that gives a 10 satoshi fee
# this succeeds
#
self.log.info(
"Reject a block trying to claim too much subsidy in the coinbase"
" transaction"
)
self.move_tip(65)
b68 = self.next_block(68, additional_coinbase_value=10)
tx = self.create_and_sign_transaction(out[20], out[20].vout[0].nValue - 9)
b68 = self.update_block(68, [tx])
self.send_blocks(
[b68], success=False, reject_reason="bad-cb-amount", reconnect=True
)
self.log.info(
"Accept a block claiming the correct subsidy in the coinbase transaction"
)
self.move_tip(65)
b69 = self.next_block(69, additional_coinbase_value=10)
tx = self.create_and_sign_transaction(out[20], out[20].vout[0].nValue - 10)
self.update_block(69, [tx])
self.send_blocks([b69], True)
self.save_spendable_output()
# Test spending the outpoint of a non-existent transaction
#
# -> b65 (19) -> b69 (20)
# \-> b70 (21)
#
self.log.info(
"Reject a block containing a transaction spending from a non-existent input"
)
self.move_tip(69)
b70 = self.next_block(70, spend=out[21])
bogus_tx = CTransaction()
bogus_tx.sha256 = uint256_from_str(
b"23c70ed7c0506e9178fc1a987f40a33946d4ad4c962b5ae3a52546da53af0c5c"
)
tx = CTransaction()
tx.vin.append(CTxIn(COutPoint(bogus_tx.sha256, 0), b"", 0xFFFFFFFF))
tx.vout.append(CTxOut(1, b""))
pad_tx(tx)
b70 = self.update_block(70, [tx])
self.send_blocks(
[b70],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
# Test accepting an invalid block which has the same hash as a valid one (via merkle tree tricks)
#
# -> b65 (19) -> b69 (20) -> b72 (21)
# \-> b71 (21)
#
# b72 is a good block.
# b71 is a copy of 72, but re-adds one of its transactions. However,
# it has the same hash as b72.
self.log.info(
"Reject a block containing a duplicate transaction but with the same Merkle"
" root (Merkle tree malleability"
)
self.move_tip(69)
b72 = self.next_block(72)
tx1 = self.create_and_sign_transaction(out[21], 2)
tx2 = self.create_and_sign_transaction(tx1, 1)
b72 = self.update_block(72, [tx1, tx2]) # now tip is 72
b71 = copy.deepcopy(b72)
# add duplicate last transaction
b71.vtx.append(b72.vtx[-1])
# b71 builds off b69
self.block_heights[b71.sha256] = self.block_heights[b69.sha256] + 1
self.blocks[71] = b71
assert_equal(len(b71.vtx), 4)
assert_equal(len(b72.vtx), 3)
assert_equal(b72.sha256, b71.sha256)
self.move_tip(71)
self.send_blocks(
[b71], success=False, reject_reason="bad-txns-duplicate", reconnect=True
)
self.move_tip(72)
self.send_blocks([b72], True)
self.save_spendable_output()
b75 = self.next_block(75)
self.save_spendable_output()
b76 = self.next_block(76)
self.save_spendable_output()
self.send_blocks([b75, b76], True)
# Test transaction resurrection
#
# -> b77 (24) -> b78 (25) -> b79 (26)
# \-> b80 (25) -> b81 (26) -> b82 (27)
#
# b78 creates a tx, which is spent in b79. After b82, both should be in mempool
#
# The tx'es must be unsigned and pass the node's mempool policy. It is unsigned for the
# rather obscure reason that the Python signature code does not distinguish between
# Low-S and High-S values (whereas the bitcoin code has custom code which does so);
# as a result of which, the odds are 50% that the python code will use the right
# value and the transaction will be accepted into the mempool. Until we modify the
# test framework to support low-S signing, we are out of luck.
#
# To get around this issue, we construct transactions which are not signed and which
# spend to OP_TRUE. If the standard-ness rules change, this test would need to be
# updated. (Perhaps to spend to a P2SH OP_TRUE script)
self.log.info("Test transaction resurrection during a re-org")
self.move_tip(76)
b77 = self.next_block(77)
tx77 = self.create_and_sign_transaction(out[24], 10 * COIN)
b77 = self.update_block(77, [tx77])
self.send_blocks([b77], True)
self.save_spendable_output()
b78 = self.next_block(78)
tx78 = self.create_tx(tx77, 0, 9 * COIN)
b78 = self.update_block(78, [tx78])
self.send_blocks([b78], True)
b79 = self.next_block(79)
tx79 = self.create_tx(tx78, 0, 8 * COIN)
b79 = self.update_block(79, [tx79])
self.send_blocks([b79], True)
# mempool should be empty
assert_equal(len(self.nodes[0].getrawmempool()), 0)
self.move_tip(77)
b80 = self.next_block(80, spend=out[25])
self.send_blocks([b80], False, force_send=True)
self.save_spendable_output()
b81 = self.next_block(81, spend=out[26])
# other chain is same length
self.send_blocks([b81], False, force_send=True)
self.save_spendable_output()
b82 = self.next_block(82, spend=out[27])
# now this chain is longer, triggers re-org
self.send_blocks([b82], True)
self.save_spendable_output()
# now check that tx78 and tx79 have been put back into the peer's
# mempool
mempool = self.nodes[0].getrawmempool()
assert_equal(len(mempool), 2)
assert tx78.hash in mempool
assert tx79.hash in mempool
# Test invalid opcodes in dead execution paths.
#
# -> b81 (26) -> b82 (27) -> b83 (28)
#
self.log.info("Accept a block with invalid opcodes in dead execution paths")
b83 = self.next_block(83)
op_codes = [OP_IF, OP_INVALIDOPCODE, OP_ELSE, OP_TRUE, OP_ENDIF]
script = CScript(op_codes)
tx1 = self.create_and_sign_transaction(out[28], out[28].vout[0].nValue, script)
tx2 = self.create_and_sign_transaction(tx1, 0, CScript([OP_TRUE]))
tx2.vin[0].scriptSig = CScript([OP_FALSE])
tx2.rehash()
b83 = self.update_block(83, [tx1, tx2])
self.send_blocks([b83], True)
self.save_spendable_output()
# Reorg on/off blocks that have OP_RETURN in them (and try to spend them)
#
# -> b81 (26) -> b82 (27) -> b83 (28) -> b84 (29) -> b87 (30) -> b88 (31)
# \-> b85 (29) -> b86 (30) \-> b89a (32)
#
self.log.info("Test re-orging blocks with OP_RETURN in them")
b84 = self.next_block(84)
tx1 = self.create_tx(out[29], 0, 0, CScript([OP_RETURN]))
vout_offset = len(tx1.vout)
tx1.vout.append(CTxOut(0, CScript([OP_TRUE])))
tx1.vout.append(CTxOut(0, CScript([OP_TRUE])))
tx1.vout.append(CTxOut(0, CScript([OP_TRUE])))
tx1.vout.append(CTxOut(0, CScript([OP_TRUE])))
tx1.calc_sha256()
self.sign_tx(tx1, out[29])
tx1.rehash()
tx2 = self.create_tx(tx1, vout_offset, 0, CScript([OP_RETURN]))
tx2.vout.append(CTxOut(0, CScript([OP_RETURN])))
tx3 = self.create_tx(tx1, vout_offset + 1, 0, CScript([OP_RETURN]))
tx3.vout.append(CTxOut(0, CScript([OP_TRUE])))
tx4 = self.create_tx(tx1, vout_offset + 2, 0, CScript([OP_TRUE]))
tx4.vout.append(CTxOut(0, CScript([OP_RETURN])))
tx5 = self.create_tx(tx1, vout_offset + 3, 0, CScript([OP_RETURN]))
b84 = self.update_block(84, [tx1, tx2, tx3, tx4, tx5])
self.send_blocks([b84], True)
self.save_spendable_output()
self.move_tip(83)
b85 = self.next_block(85, spend=out[29])
self.send_blocks([b85], False) # other chain is same length
b86 = self.next_block(86, spend=out[30])
self.send_blocks([b86], True)
self.move_tip(84)
b87 = self.next_block(87, spend=out[30])
self.send_blocks([b87], False) # other chain is same length
self.save_spendable_output()
b88 = self.next_block(88, spend=out[31])
self.send_blocks([b88], True)
self.save_spendable_output()
# trying to spend the OP_RETURN output is rejected
b89a = self.next_block("89a", spend=out[32])
tx = self.create_tx(tx1, 0, 0, CScript([OP_TRUE]))
b89a = self.update_block("89a", [tx])
self.send_blocks(
[b89a],
success=False,
reject_reason="bad-txns-inputs-missingorspent",
reconnect=True,
)
self.log.info("Test a re-org of one week's worth of blocks (1088 blocks)")
self.move_tip(88)
LARGE_REORG_SIZE = 1088
blocks = []
spend = out[32]
for i in range(89, LARGE_REORG_SIZE + 89):
b = self.next_block(i, spend)
tx = CTransaction()
script_length = LEGACY_MAX_BLOCK_SIZE - len(b.serialize()) - 69
script_output = CScript([b"\x00" * script_length])
tx.vout.append(CTxOut(0, script_output))
tx.vin.append(CTxIn(COutPoint(b.vtx[1].sha256, 0)))
b = self.update_block(i, [tx])
assert_equal(len(b.serialize()), LEGACY_MAX_BLOCK_SIZE)
blocks.append(b)
self.save_spendable_output()
spend = self.get_spendable_output()
self.send_blocks(blocks, True, timeout=2440)
chain1_tip = i
# now create alt chain of same length
self.move_tip(88)
blocks2 = []
for i in range(89, LARGE_REORG_SIZE + 89):
blocks2.append(self.next_block(f"alt{i}"))
self.send_blocks(blocks2, False, force_send=False)
# extend alt chain to trigger re-org
block = self.next_block(f"alt{chain1_tip + 1}")
self.send_blocks([block], True, timeout=2440)
# ... and re-org back to the first chain
self.move_tip(chain1_tip)
block = self.next_block(chain1_tip + 1)
self.send_blocks([block], False, force_send=True)
block = self.next_block(chain1_tip + 2)
self.send_blocks([block], True, timeout=2440)
self.log.info("Reject a block with an invalid block header version")
b_v1 = self.next_block("b_v1", version=1)
self.send_blocks(
[b_v1],
success=False,
force_send=True,
reject_reason="bad-version(0x00000001)",
reconnect=True,
)
self.move_tip(chain1_tip + 2)
b_cb34 = self.next_block("b_cb34")
b_cb34.vtx[0].vin[0].scriptSig = b_cb34.vtx[0].vin[0].scriptSig[:-1]
b_cb34.vtx[0].rehash()
b_cb34.hashMerkleRoot = b_cb34.calc_merkle_root()
b_cb34.solve()
self.send_blocks(
[b_cb34], success=False, reject_reason="bad-cb-height", reconnect=True
)
# Helper methods
################
def add_transactions_to_block(self, block, tx_list):
[tx.rehash() for tx in tx_list]
block.vtx.extend(tx_list)
# this is a little handier to use than the version in blocktools.py
def create_tx(self, spend_tx, n, value, script=CScript([OP_TRUE])):
return create_tx_with_script(spend_tx, n, amount=value, script_pub_key=script)
# sign a transaction, using the key we know about
# this signs input 0 in tx, which is assumed to be spending output n in
# spend_tx
def sign_tx(self, tx, spend_tx):
scriptPubKey = bytearray(spend_tx.vout[0].scriptPubKey)
if scriptPubKey[0] == OP_TRUE: # an anyone-can-spend
tx.vin[0].scriptSig = CScript()
return
sighash = SignatureHashForkId(
spend_tx.vout[0].scriptPubKey,
tx,
0,
SIGHASH_ALL | SIGHASH_FORKID,
spend_tx.vout[0].nValue,
)
tx.vin[0].scriptSig = CScript(
[
self.coinbase_key.sign_ecdsa(sighash)
+ bytes(bytearray([SIGHASH_ALL | SIGHASH_FORKID]))
]
)
def create_and_sign_transaction(self, spend_tx, value, script=CScript([OP_TRUE])):
tx = self.create_tx(spend_tx, 0, value, script)
self.sign_tx(tx, spend_tx)
tx.rehash()
return tx
def next_block(
self,
number,
spend=None,
additional_coinbase_value=0,
script=CScript([OP_TRUE]),
*,
version=4,
):
if self.tip is None:
base_block_hash = self.genesis_hash
block_time = int(time.time()) + 1
else:
base_block_hash = self.tip.sha256
block_time = self.tip.nTime + 1
# First create the coinbase
height = self.block_heights[base_block_hash] + 1
coinbase = create_coinbase(height, self.coinbase_pubkey)
coinbase.vout[0].nValue += additional_coinbase_value
coinbase.rehash()
if spend is None:
block = create_block(base_block_hash, coinbase, block_time, version=version)
else:
# all but one satoshi to fees
coinbase.vout[0].nValue += spend.vout[0].nValue - 1
coinbase.rehash()
block = create_block(base_block_hash, coinbase, block_time, version=version)
# spend 1 satoshi
tx = self.create_tx(spend, 0, 1, script)
self.sign_tx(tx, spend)
self.add_transactions_to_block(block, [tx])
block.hashMerkleRoot = block.calc_merkle_root()
# Block is created. Find a valid nonce.
block.solve()
self.tip = block
self.block_heights[block.sha256] = height
assert number not in self.blocks
self.blocks[number] = block
return block
# save the current tip so it can be spent by a later block
def save_spendable_output(self):
self.log.debug(f"saving spendable output {self.tip.vtx[0]}")
self.spendable_outputs.append(self.tip)
# get an output that we previously marked as spendable
def get_spendable_output(self):
self.log.debug(f"getting spendable output {self.spendable_outputs[0].vtx[0]}")
return self.spendable_outputs.pop(0).vtx[0]
# move the tip back to a previous block
def move_tip(self, number):
self.tip = self.blocks[number]
# adds transactions to the block and updates state
def update_block(self, block_number, new_transactions, reorder=True):
block = self.blocks[block_number]
self.add_transactions_to_block(block, new_transactions)
old_sha256 = block.sha256
if reorder:
make_conform_to_ctor(block)
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
# Update the internal state just like in next_block
self.tip = block
if block.sha256 != old_sha256:
self.block_heights[block.sha256] = self.block_heights[old_sha256]
del self.block_heights[old_sha256]
self.blocks[block_number] = block
return block
def bootstrap_p2p(self, timeout=60):
"""Add a P2P connection to the node.
Helper to connect and wait for version handshake."""
self.helper_peer = self.nodes[0].add_p2p_connection(P2PDataStore())
# We need to wait for the initial getheaders from the peer before we
# start populating our blockstore. If we don't, then we may run ahead
# to the next subtest before we receive the getheaders. We'd then send
# an INV for the next block and receive two getheaders - one for the
# IBD and one for the INV. We'd respond to both and could get
# unexpectedly disconnected if the DoS score for that error is 50.
self.helper_peer.wait_for_getheaders(timeout=timeout)
def reconnect_p2p(self, timeout=60):
"""Tear down and bootstrap the P2P connection to the node.
The node gets disconnected several times in this test. This helper
method reconnects the p2p and restarts the network thread."""
self.nodes[0].disconnect_p2ps()
self.bootstrap_p2p(timeout=timeout)
def send_blocks(
self,
blocks,
success=True,
reject_reason=None,
force_send=False,
reconnect=False,
timeout=60,
):
"""Sends blocks to test node. Syncs and verifies that tip has advanced to most recent block.
Call with success = False if the tip shouldn't advance to the most recent block.
"""
self.helper_peer.send_blocks_and_test(
blocks,
self.nodes[0],
success=success,
reject_reason=reject_reason,
force_send=force_send,
timeout=timeout,
expect_disconnect=reconnect,
)
if reconnect:
self.reconnect_p2p(timeout=timeout)
if __name__ == "__main__":
FullBlockTest().main()
diff --git a/test/functional/feature_csv_activation.py b/test/functional/feature_csv_activation.py
index e311cdd4c..e16888535 100644
--- a/test/functional/feature_csv_activation.py
+++ b/test/functional/feature_csv_activation.py
@@ -1,657 +1,656 @@
# Copyright (c) 2015-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test activation of the first version bits soft fork.
This soft fork will activate the following BIPS:
BIP 68 - nSequence relative lock times
BIP 112 - CHECKSEQUENCEVERIFY
BIP 113 - MedianTimePast semantics for nLockTime
regtest lock-in with 108/144 block signalling
activation after a further 144 blocks
mine 82 blocks whose coinbases will be used to generate inputs for our tests
mine 489 blocks and seed block chain with the 82 inputs will use for our tests at height 572
mine 3 blocks and verify still at LOCKED_IN and test that enforcement has not triggered
mine 1 block and test that enforcement has triggered (which triggers ACTIVE)
Test BIP 113 is enforced
Mine 4 blocks so next height is 580 and test BIP 68 is enforced for time and height
Mine 1 block so next height is 581 and test BIP 68 now passes time but not height
Mine 1 block so next height is 582 and test BIP 68 now passes time and height
Test that BIP 112 is enforced
Various transactions will be used to test that the BIPs rules are not enforced before the soft fork activates
And that after the soft fork activates transactions pass and fail as they should according to the rules.
For each BIP, transactions of versions 1 and 2 will be tested.
----------------
BIP 113:
bip113tx - modify the nLocktime variable
BIP 68:
bip68txs - 16 txs with nSequence relative locktime of 10 with various bits set as per the relative_locktimes below
BIP 112:
bip112txs_vary_nSequence - 16 txs with nSequence relative_locktimes of 10 evaluated against 10 OP_CSV OP_DROP
bip112txs_vary_nSequence_9 - 16 txs with nSequence relative_locktimes of 9 evaluated against 10 OP_CSV OP_DROP
bip112txs_vary_OP_CSV - 16 txs with nSequence = 10 evaluated against varying {relative_locktimes of 10} OP_CSV OP_DROP
bip112txs_vary_OP_CSV_9 - 16 txs with nSequence = 9 evaluated against varying {relative_locktimes of 10} OP_CSV OP_DROP
bip112tx_special - test negative argument to OP_CSV
"""
import time
from itertools import product
from test_framework.address import ADDRESS_ECREG_UNSPENDABLE
from test_framework.blocktools import (
create_block,
create_coinbase,
make_conform_to_ctor,
)
from test_framework.messages import XEC, CTransaction, FromHex
from test_framework.p2p import P2PDataStore
from test_framework.script import OP_CHECKSEQUENCEVERIFY, OP_DROP, OP_TRUE, CScript
from test_framework.test_framework import BitcoinTestFramework
from test_framework.txtools import pad_tx
from test_framework.util import assert_equal
from test_framework.wallet import MiniWallet
BASE_RELATIVE_LOCKTIME = 10
SEQ_DISABLE_FLAG = 1 << 31
SEQ_RANDOM_HIGH_BIT = 1 << 25
SEQ_TYPE_FLAG = 1 << 22
SEQ_RANDOM_LOW_BIT = 1 << 18
def relative_locktime(sdf, srhb, stf, srlb):
"""Returns a locktime with certain bits set."""
locktime = BASE_RELATIVE_LOCKTIME
if sdf:
locktime |= SEQ_DISABLE_FLAG
if srhb:
locktime |= SEQ_RANDOM_HIGH_BIT
if stf:
locktime |= SEQ_TYPE_FLAG
if srlb:
locktime |= SEQ_RANDOM_LOW_BIT
return locktime
def all_rlt_txs(txs):
return [tx["tx"] for tx in txs]
def get_csv_status(node):
height = node.getblockchaininfo()["blocks"]
return height >= 576
class BIP68_112_113Test(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.noban_tx_relay = True
def create_self_transfer_from_utxo(self, input_tx):
utxo = self.miniwallet.get_utxo(txid=input_tx.rehash(), mark_as_spent=False)
tx = self.miniwallet.create_self_transfer(utxo_to_spend=utxo)["tx"]
return tx
def spend_tx(self, prev_tx):
inputs = [{"txid": prev_tx.hash, "vout": 0}]
outputs = {ADDRESS_ECREG_UNSPENDABLE: (prev_tx.vout[0].nValue - 1000) / XEC}
rawtx = self.nodes[0].createrawtransaction(inputs, outputs)
spendtx = FromHex(CTransaction(), rawtx)
spendtx.nVersion = prev_tx.nVersion
pad_tx(spendtx)
return spendtx
def create_bip112special(self, txid, txversion):
tx = self.create_self_transfer_from_utxo(txid)
tx.nVersion = txversion
tx.vout[0].scriptPubKey = CScript(
[-1, OP_CHECKSEQUENCEVERIFY, OP_DROP, OP_TRUE]
)
pad_tx(tx)
return tx
def send_generic_input_tx(self, coinbases):
input_txid = self.nodes[0].getblock(coinbases.pop(), 2)["tx"][0]["txid"]
utxo_to_spend = self.miniwallet.get_utxo(txid=input_txid)
return self.miniwallet.send_self_transfer(
from_node=self.nodes[0], utxo_to_spend=utxo_to_spend
)["tx"]
def create_bip68txs(self, bip68inputs, txversion, locktime_delta=0):
"""Returns a list of bip68 transactions with different bits set."""
txs = []
assert len(bip68inputs) >= 16
for i, (sdf, srhb, stf, srlb) in enumerate(product(*[[True, False]] * 4)):
locktime = relative_locktime(sdf, srhb, stf, srlb)
tx = self.create_self_transfer_from_utxo(bip68inputs[i])
tx.nVersion = txversion
tx.vin[0].nSequence = locktime + locktime_delta
pad_tx(tx)
txs.append({"tx": tx, "sdf": sdf, "stf": stf})
return txs
def create_bip112txs(self, bip112inputs, varyOP_CSV, txversion, locktime_delta=0):
"""Returns a list of bip112 transactions with different bits set."""
txs = []
assert len(bip112inputs) >= 16
for i, (sdf, srhb, stf, srlb) in enumerate(product(*[[True, False]] * 4)):
locktime = relative_locktime(sdf, srhb, stf, srlb)
tx = self.create_self_transfer_from_utxo(bip112inputs[i])
if varyOP_CSV: # if varying OP_CSV, nSequence is fixed
tx.vin[0].nSequence = BASE_RELATIVE_LOCKTIME + locktime_delta
else: # vary nSequence instead, OP_CSV is fixed
tx.vin[0].nSequence = locktime + locktime_delta
tx.nVersion = txversion
if varyOP_CSV:
tx.vout[0].scriptPubKey = CScript(
[locktime, OP_CHECKSEQUENCEVERIFY, OP_DROP, OP_TRUE]
)
else:
tx.vout[0].scriptPubKey = CScript(
[BASE_RELATIVE_LOCKTIME, OP_CHECKSEQUENCEVERIFY, OP_DROP, OP_TRUE]
)
pad_tx(tx)
txs.append({"tx": tx, "sdf": sdf, "stf": stf})
return txs
def generate_blocks(self, number):
test_blocks = []
for _ in range(number):
block = self.create_test_block([])
test_blocks.append(block)
self.last_block_time += 600
self.tip = block.sha256
self.tipheight += 1
return test_blocks
def create_test_block(self, txs, version=536870912):
block = create_block(
self.tip, create_coinbase(self.tipheight + 1), self.last_block_time + 600
)
block.nVersion = version
block.vtx.extend(txs)
make_conform_to_ctor(block)
block.hashMerkleRoot = block.calc_merkle_root()
- block.rehash()
block.solve()
return block
# Create a block with given txs, and spend these txs in the same block.
# Spending utxos in the same block is OK as long as nSequence is not enforced.
# Otherwise a number of intermediate blocks should be generated, and this
# method should not be used.
def create_test_block_spend_utxos(self, txs, version=536870912):
block = self.create_test_block(txs, version)
block.vtx.extend([self.spend_tx(tx) for tx in txs])
make_conform_to_ctor(block)
block.hashMerkleRoot = block.calc_merkle_root()
block.rehash()
block.solve()
return block
def send_blocks(self, blocks, success=True):
"""Sends blocks to test node. Syncs and verifies that tip has advanced to most recent block.
Call with success = False if the tip shouldn't advance to the most recent block.
"""
self.helper_peer.send_blocks_and_test(blocks, self.nodes[0], success=success)
def run_test(self):
self.helper_peer = self.nodes[0].add_p2p_connection(P2PDataStore())
self.miniwallet = MiniWallet(self.nodes[0])
self.log.info("Generate blocks in the past for coinbase outputs.")
# Enough to build up to 1000 blocks 10 minutes apart without worrying
# about getting into the future
long_past_time = int(time.time()) - 600 * 1000
# Enough so that the generated blocks will still all be before
# long_past_time
self.nodes[0].setmocktime(long_past_time - 100)
# 82 blocks generated for inputs
self.coinbase_blocks = self.generate(self.miniwallet, 1 + 16 + 2 * 32 + 1)
# Set time back to present so yielded blocks aren't in the future as
# we advance last_block_time
self.nodes[0].setmocktime(0)
# height of the next block to build
self.tipheight = 82
self.last_block_time = long_past_time
self.tip = int(self.nodes[0].getbestblockhash(), 16)
# CSV is not activated yet.
assert_equal(get_csv_status(self.nodes[0]), False)
# Generate 489 more version 4 blocks
test_blocks = self.generate_blocks(489)
# Test #1
self.send_blocks(test_blocks)
# Still not activated.
assert_equal(get_csv_status(self.nodes[0]), False)
# Inputs at height = 572
#
# Put inputs for all tests in the chain at height 572 (tip now = 571) (time increases by 600s per block)
# Note we reuse inputs for v1 and v2 txs so must test these separately
# 16 normal inputs
bip68inputs = []
for _ in range(16):
bip68inputs.append(self.send_generic_input_tx(self.coinbase_blocks))
# 2 sets of 16 inputs with 10 OP_CSV OP_DROP (actually will be
# prepended to spending scriptSig)
bip112basicinputs = []
for _ in range(2):
inputs = []
for _ in range(16):
inputs.append(self.send_generic_input_tx(self.coinbase_blocks))
bip112basicinputs.append(inputs)
# 2 sets of 16 varied inputs with (relative_lock_time) OP_CSV OP_DROP
# (actually will be prepended to spending scriptSig)
bip112diverseinputs = []
for _ in range(2):
inputs = []
for _ in range(16):
inputs.append(self.send_generic_input_tx(self.coinbase_blocks))
bip112diverseinputs.append(inputs)
# 1 special input with -1 OP_CSV OP_DROP (actually will be prepended to
# spending scriptSig)
bip112specialinput = self.send_generic_input_tx(self.coinbase_blocks)
# 1 normal input
bip113input = self.send_generic_input_tx(self.coinbase_blocks)
self.nodes[0].setmocktime(self.last_block_time + 600)
# 1 block generated for inputs to be in chain at height 572
inputblockhash = self.generate(self.nodes[0], 1)[0]
self.nodes[0].setmocktime(0)
self.tip = int(inputblockhash, 16)
self.tipheight += 1
self.last_block_time += 600
assert_equal(len(self.nodes[0].getblock(inputblockhash, True)["tx"]), 82 + 1)
# 2 more version 4 blocks
test_blocks = self.generate_blocks(2)
# Test #2
self.send_blocks(test_blocks)
self.log.info(
"Not yet activated, height = 574 (will activate for block 576, not 575)"
)
assert_equal(get_csv_status(self.nodes[0]), False)
# Test both version 1 and version 2 transactions for all tests
# BIP113 test transaction will be modified before each use to
# put in appropriate block time
bip113tx_v1 = self.create_self_transfer_from_utxo(bip113input)
bip113tx_v1.vin[0].nSequence = 0xFFFFFFFE
bip113tx_v1.nVersion = 1
bip113tx_v2 = self.create_self_transfer_from_utxo(bip113input)
bip113tx_v2.vin[0].nSequence = 0xFFFFFFFE
bip113tx_v2.nVersion = 2
# For BIP68 test all 16 relative sequence locktimes
bip68txs_v1 = self.create_bip68txs(bip68inputs, 1)
bip68txs_v2 = self.create_bip68txs(bip68inputs, 2)
# For BIP112 test:
# 16 relative sequence locktimes of 10 against 10 OP_CSV OP_DROP inputs
bip112txs_vary_nSequence_v1 = self.create_bip112txs(
bip112basicinputs[0], False, 1
)
bip112txs_vary_nSequence_v2 = self.create_bip112txs(
bip112basicinputs[0], False, 2
)
# 16 relative sequence locktimes of 9 against 10 OP_CSV OP_DROP inputs
bip112txs_vary_nSequence_9_v1 = self.create_bip112txs(
bip112basicinputs[1], False, 1, -1
)
bip112txs_vary_nSequence_9_v2 = self.create_bip112txs(
bip112basicinputs[1], False, 2, -1
)
# sequence lock time of 10 against 16 (relative_lock_time) OP_CSV
# OP_DROP inputs
bip112txs_vary_OP_CSV_v1 = self.create_bip112txs(
bip112diverseinputs[0], True, 1
)
bip112txs_vary_OP_CSV_v2 = self.create_bip112txs(
bip112diverseinputs[0], True, 2
)
# sequence lock time of 9 against 16 (relative_lock_time) OP_CSV
# OP_DROP inputs
bip112txs_vary_OP_CSV_9_v1 = self.create_bip112txs(
bip112diverseinputs[1], True, 1, -1
)
bip112txs_vary_OP_CSV_9_v2 = self.create_bip112txs(
bip112diverseinputs[1], True, 2, -1
)
# -1 OP_CSV OP_DROP input
bip112tx_special_v1 = self.create_bip112special(bip112specialinput, 1)
bip112tx_special_v2 = self.create_bip112special(bip112specialinput, 2)
self.log.info("TESTING")
self.log.info("Pre-Soft Fork Tests. All txs should pass.")
self.log.info("Test version 1 txs")
success_txs = []
# add BIP113 tx and -1 CSV tx
# = MTP of prior block (not <) but < time put on current block
bip113tx_v1.nLockTime = self.last_block_time - 600 * 5
bip113tx_v1.rehash()
success_txs.append(bip113tx_v1)
success_txs.append(bip112tx_special_v1)
success_txs.append(self.spend_tx(bip112tx_special_v1))
# add BIP 68 txs
success_txs.extend(all_rlt_txs(bip68txs_v1))
# add BIP 112 with seq=10 txs
success_txs.extend(all_rlt_txs(bip112txs_vary_nSequence_v1))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_nSequence_v1)]
)
success_txs.extend(all_rlt_txs(bip112txs_vary_OP_CSV_v1))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_OP_CSV_v1)]
)
# try BIP 112 with seq=9 txs
success_txs.extend(all_rlt_txs(bip112txs_vary_nSequence_9_v1))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_nSequence_9_v1)]
)
success_txs.extend(all_rlt_txs(bip112txs_vary_OP_CSV_9_v1))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_OP_CSV_9_v1)]
)
# Test #3
self.send_blocks([self.create_test_block(success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
self.log.info("Test version 2 txs")
success_txs = []
# add BIP113 tx and -1 CSV tx
# = MTP of prior block (not <) but < time put on current block
bip113tx_v2.nLockTime = self.last_block_time - 600 * 5
bip113tx_v2.rehash()
success_txs.append(bip113tx_v2)
success_txs.append(bip112tx_special_v2)
success_txs.append(self.spend_tx(bip112tx_special_v2))
# add BIP 68 txs
success_txs.extend(all_rlt_txs(bip68txs_v2))
# add BIP 112 with seq=10 txs
success_txs.extend(all_rlt_txs(bip112txs_vary_nSequence_v2))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_nSequence_v2)]
)
success_txs.extend(all_rlt_txs(bip112txs_vary_OP_CSV_v2))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_OP_CSV_v2)]
)
# try BIP 112 with seq=9 txs
success_txs.extend(all_rlt_txs(bip112txs_vary_nSequence_9_v2))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_nSequence_9_v2)]
)
success_txs.extend(all_rlt_txs(bip112txs_vary_OP_CSV_9_v2))
success_txs.extend(
[self.spend_tx(tx) for tx in all_rlt_txs(bip112txs_vary_OP_CSV_9_v2)]
)
# Test #4
self.send_blocks([self.create_test_block(success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# 1 more version 4 block to get us to height 575 so the fork should
# now be active for the next block
test_blocks = self.generate_blocks(1)
# Test #5
self.send_blocks(test_blocks)
assert_equal(get_csv_status(self.nodes[0]), False)
self.generate(self.nodes[0], 1)
assert_equal(get_csv_status(self.nodes[0]), True)
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
self.log.info("Post-Soft Fork Tests.")
self.log.info("BIP 113 tests")
# BIP 113 tests should now fail regardless of version number
# if nLockTime isn't satisfied by new rules
# = MTP of prior block (not <) but < time put on current block
bip113tx_v1.nLockTime = self.last_block_time - 600 * 5
bip113tx_v1.rehash()
# = MTP of prior block (not <) but < time put on current block
bip113tx_v2.nLockTime = self.last_block_time - 600 * 5
bip113tx_v2.rehash()
for bip113tx in [bip113tx_v1, bip113tx_v2]:
# Test #6, Test #7
self.send_blocks([self.create_test_block([bip113tx])], success=False)
# BIP 113 tests should now pass if the locktime is < MTP
# < MTP of prior block
bip113tx_v1.nLockTime = self.last_block_time - 600 * 5 - 1
bip113tx_v1.rehash()
# < MTP of prior block
bip113tx_v2.nLockTime = self.last_block_time - 600 * 5 - 1
bip113tx_v2.rehash()
for bip113tx in [bip113tx_v1, bip113tx_v2]:
# Test #8, Test #9
self.send_blocks([self.create_test_block([bip113tx])])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# Next block height = 580 after 4 blocks of random version
test_blocks = self.generate_blocks(4)
# Test #10
self.send_blocks(test_blocks)
self.log.info("BIP 68 tests")
self.log.info("Test version 1 txs - all should still pass")
success_txs = []
success_txs.extend(all_rlt_txs(bip68txs_v1))
# Test #11
self.send_blocks([self.create_test_block(success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
self.log.info("Test version 2 txs")
# All txs with SEQUENCE_LOCKTIME_DISABLE_FLAG set pass
bip68success_txs = [tx["tx"] for tx in bip68txs_v2 if tx["sdf"]]
self.send_blocks([self.create_test_block(bip68success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# All txs without flag fail as we are at delta height = 8 < 10 and
# delta time = 8 * 600 < 10 * 512
bip68timetxs = [tx["tx"] for tx in bip68txs_v2 if not tx["sdf"] and tx["stf"]]
for tx in bip68timetxs:
# Test #13 - Test #16
self.send_blocks([self.create_test_block([tx])], success=False)
bip68heighttxs = [
tx["tx"] for tx in bip68txs_v2 if not tx["sdf"] and not tx["stf"]
]
for tx in bip68heighttxs:
# Test #17 - Test #20
self.send_blocks([self.create_test_block([tx])], success=False)
# Advance one block to 581
test_blocks = self.generate_blocks(1)
# Test #21
self.send_blocks(
test_blocks,
)
# Height txs should fail and time txs should now pass 9 * 600 > 10 *
# 512
bip68success_txs.extend(bip68timetxs)
# Test #22
self.send_blocks([self.create_test_block(bip68success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
for tx in bip68heighttxs:
# Test #23 - Test #26
self.send_blocks([self.create_test_block([tx])], success=False)
# Advance one block to 582
test_blocks = self.generate_blocks(1)
# Test #27
self.send_blocks(test_blocks)
# All BIP 68 txs should pass
bip68success_txs.extend(bip68heighttxs)
# Test #28
self.send_blocks([self.create_test_block(bip68success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
self.log.info("BIP 112 tests")
self.log.info("Test version 1 txs")
# -1 OP_CSV tx should fail
# Test #29
self.send_blocks(
[self.create_test_block_spend_utxos([bip112tx_special_v1])],
success=False,
)
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is set in argument to OP_CSV,
# version 1 txs should still pass
success_txs = [tx["tx"] for tx in bip112txs_vary_OP_CSV_v1 if tx["sdf"]]
success_txs += [tx["tx"] for tx in bip112txs_vary_OP_CSV_9_v1 if tx["sdf"]]
# Test #30
self.send_blocks([self.create_test_block_spend_utxos(success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is unset in argument to OP_CSV,
# version 1 txs should now fail
fail_txs = all_rlt_txs(bip112txs_vary_nSequence_v1)
fail_txs += all_rlt_txs(bip112txs_vary_nSequence_9_v1)
fail_txs += [tx["tx"] for tx in bip112txs_vary_OP_CSV_9_v1 if not tx["sdf"]]
fail_txs += [tx["tx"] for tx in bip112txs_vary_OP_CSV_9_v1 if not tx["sdf"]]
for tx in fail_txs:
# Test #31 - Test #78
self.send_blocks([self.create_test_block_spend_utxos([tx])], success=False)
self.log.info("Test version 2 txs")
# -1 OP_CSV tx should fail
# Test #79
self.send_blocks(
[self.create_test_block_spend_utxos([bip112tx_special_v2])],
success=False,
)
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is set in argument to OP_CSV,
# version 2 txs should pass
success_txs = [tx["tx"] for tx in bip112txs_vary_OP_CSV_v2 if tx["sdf"]]
success_txs += [tx["tx"] for tx in bip112txs_vary_OP_CSV_9_v2 if tx["sdf"]]
# Test #80
self.send_blocks([self.create_test_block_spend_utxos(success_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# SEQUENCE_LOCKTIME_DISABLE_FLAG is unset in argument to OP_CSV for all
# remaining txs ##
# All txs with nSequence 9 should fail either due to earlier mismatch
# or failing the CSV check
fail_txs = all_rlt_txs(bip112txs_vary_nSequence_9_v2)
fail_txs += [tx["tx"] for tx in bip112txs_vary_OP_CSV_9_v2 if not tx["sdf"]]
for tx in fail_txs:
# Test #81 - Test #104
self.send_blocks([self.create_test_block_spend_utxos([tx])], success=False)
# If SEQUENCE_LOCKTIME_DISABLE_FLAG is set in nSequence, tx should fail
fail_txs = [tx["tx"] for tx in bip112txs_vary_nSequence_v2 if tx["sdf"]]
for tx in fail_txs:
# Test #105 - Test #112
self.send_blocks([self.create_test_block_spend_utxos([tx])], success=False)
# If sequencelock types mismatch, tx should fail
fail_txs = [
tx["tx"]
for tx in bip112txs_vary_nSequence_v2
if not tx["sdf"] and tx["stf"]
]
fail_txs += [
tx["tx"] for tx in bip112txs_vary_OP_CSV_v2 if not tx["sdf"] and tx["stf"]
]
for tx in fail_txs:
# Test #113 - Test #120
self.send_blocks([self.create_test_block_spend_utxos([tx])], success=False)
# Remaining txs should pass, just test masking works properly
success_txs = [
tx["tx"]
for tx in bip112txs_vary_nSequence_v2
if not tx["sdf"] and not tx["stf"]
]
success_txs += [
tx["tx"]
for tx in bip112txs_vary_OP_CSV_v2
if not tx["sdf"] and not tx["stf"]
]
# Test #121
self.send_blocks([self.create_test_block(success_txs)])
# Spending the previous block utxos requires a difference of 10 blocks (nSequence = 10).
# Generate 9 blocks then spend in the 10th
block = self.nodes[0].getbestblockhash()
self.last_block_time += 600
self.tip = int(f"0x{block}", 0)
self.tipheight += 1
# Test #122
self.send_blocks(self.generate_blocks(9))
spend_txs = []
for tx in success_txs:
raw_tx = self.spend_tx(tx)
raw_tx.vin[0].nSequence = BASE_RELATIVE_LOCKTIME
raw_tx.rehash()
spend_txs.append(raw_tx)
# Test #123
self.send_blocks([self.create_test_block(spend_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# Additional test, of checking that comparison of two time types works
# properly
time_txs = []
for tx in [
tx["tx"] for tx in bip112txs_vary_OP_CSV_v2 if not tx["sdf"] and tx["stf"]
]:
tx.rehash()
time_txs.append(tx)
# Test #124
self.send_blocks([self.create_test_block(time_txs)])
# Spending the previous block utxos requires a block time difference of
# at least 10 * 512s (nSequence = 10).
# Generate 8 blocks then spend in the 9th (9 * 600 > 10 * 512)
block = self.nodes[0].getbestblockhash()
self.last_block_time += 600
self.tip = int(f"0x{block}", 0)
self.tipheight += 1
# Test #125
self.send_blocks(self.generate_blocks(8))
spend_txs = []
for tx in time_txs:
raw_tx = self.spend_tx(tx)
raw_tx.vin[0].nSequence = BASE_RELATIVE_LOCKTIME | SEQ_TYPE_FLAG
raw_tx.rehash()
spend_txs.append(raw_tx)
# Test #126
self.send_blocks([self.create_test_block(spend_txs)])
self.nodes[0].invalidateblock(self.nodes[0].getbestblockhash())
# TODO: Test empty stack fails
if __name__ == "__main__":
BIP68_112_113Test().main()
diff --git a/test/functional/feature_dersig.py b/test/functional/feature_dersig.py
index ba3e56830..435bfc982 100644
--- a/test/functional/feature_dersig.py
+++ b/test/functional/feature_dersig.py
@@ -1,128 +1,125 @@
# Copyright (c) 2015-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test BIP66 (DER SIG).
Test that the DERSIG soft-fork activates at (regtest) height 1251.
"""
from test_framework.blocktools import create_block, create_coinbase
from test_framework.messages import msg_block
from test_framework.p2p import P2PInterface
from test_framework.script import CScript
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
from test_framework.wallet import MiniWallet, MiniWalletMode
DERSIG_HEIGHT = 1251
# A canonical signature consists of:
# <30> <total len> <02> <len R> <R> <02> <len S> <S> <hashtype>
def unDERify(tx):
"""
Make the signature in vin 0 of a tx non-DER-compliant,
by adding padding after the S-value.
"""
scriptSig = CScript(tx.vin[0].scriptSig)
newscript = []
for i in scriptSig:
if len(newscript) == 0:
newscript.append(i[0:-1] + b"\0" + i[-1:])
else:
newscript.append(i)
tx.vin[0].scriptSig = CScript(newscript)
class BIP66Test(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.noban_tx_relay = True
self.setup_clean_chain = True
self.rpc_timeout = 240
def create_tx(self, input_txid):
utxo_to_spend = self.miniwallet.get_utxo(txid=input_txid, mark_as_spent=False)
return self.miniwallet.create_self_transfer(utxo_to_spend=utxo_to_spend)["tx"]
def run_test(self):
peer = self.nodes[0].add_p2p_connection(P2PInterface())
self.miniwallet = MiniWallet(self.nodes[0], mode=MiniWalletMode.RAW_P2PK)
self.log.info(f"Mining {DERSIG_HEIGHT - 1} blocks")
self.coinbase_txids = [
self.nodes[0].getblock(b)["tx"][0]
for b in self.generate(self.miniwallet, DERSIG_HEIGHT - 1)
]
self.log.info("Test that blocks must now be at least version 3")
tip = self.nodes[0].getbestblockhash()
block_time = self.nodes[0].getblockheader(tip)["mediantime"] + 1
block = create_block(int(tip, 16), create_coinbase(DERSIG_HEIGHT), block_time)
block.nVersion = 2
- block.rehash()
block.solve()
with self.nodes[0].assert_debug_log(
expected_msgs=[f"{block.hash}, bad-version(0x00000002)"]
):
peer.send_and_ping(msg_block(block))
assert_equal(self.nodes[0].getbestblockhash(), tip)
peer.sync_with_ping()
self.log.info(
"Test that transactions with non-DER signatures cannot appear in a block"
)
block.nVersion = 3
spendtx = self.create_tx(self.coinbase_txids[1])
unDERify(spendtx)
spendtx.rehash()
# First we show that this tx is valid except for DERSIG by getting it
# rejected from the mempool for exactly that reason.
assert_equal(
[
{
"txid": spendtx.hash,
"allowed": False,
"reject-reason": (
"mandatory-script-verify-flag-failed (Non-canonical DER"
" signature)"
),
}
],
self.nodes[0].testmempoolaccept(
rawtxs=[spendtx.serialize().hex()], maxfeerate=0
),
)
# Now we verify that a block with this transaction is also invalid.
block.vtx.append(spendtx)
block.hashMerkleRoot = block.calc_merkle_root()
- block.rehash()
block.solve()
with self.nodes[0].assert_debug_log(
expected_msgs=[f"ConnectBlock {block.hash} failed, blk-bad-inputs"]
):
peer.send_and_ping(msg_block(block))
assert_equal(self.nodes[0].getbestblockhash(), tip)
peer.sync_with_ping()
self.log.info(
"Test that a version 3 block with a DERSIG-compliant transaction is"
" accepted"
)
block.vtx[1] = self.create_tx(self.coinbase_txids[1])
block.hashMerkleRoot = block.calc_merkle_root()
- block.rehash()
block.solve()
peer.send_and_ping(msg_block(block))
assert_equal(int(self.nodes[0].getbestblockhash(), 16), block.sha256)
if __name__ == "__main__":
BIP66Test().main()
diff --git a/test/functional/p2p_invalid_block.py b/test/functional/p2p_invalid_block.py
index 45cb7bd85..bc8feacf7 100644
--- a/test/functional/p2p_invalid_block.py
+++ b/test/functional/p2p_invalid_block.py
@@ -1,187 +1,183 @@
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test node responses to invalid blocks.
In this test we connect to one node over p2p, and test block requests:
1) Valid blocks should be requested and become chain tip.
2) Invalid block with duplicated transaction should be re-requested.
3) Invalid block with bad coinbase value should be rejected and not
re-requested.
4) Invalid block due to future timestamp is later accepted when that timestamp
becomes valid.
"""
import copy
import time
from test_framework.blocktools import (
COINBASE_MATURITY,
MAX_FUTURE_BLOCK_TIME,
create_block,
create_coinbase,
create_tx_with_script,
make_conform_to_ctor,
)
from test_framework.messages import COIN
from test_framework.p2p import P2PDataStore
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
class InvalidBlockRequestTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
self.noban_tx_relay = True
def run_test(self):
# Add p2p connection to node0
node = self.nodes[0] # convenience reference to the node
peer = node.add_p2p_connection(P2PDataStore())
best_block = node.getblock(node.getbestblockhash())
tip = int(node.getbestblockhash(), 16)
height = best_block["height"] + 1
block_time = best_block["time"] + 1
self.log.info("Create a new block with an anyone-can-spend coinbase")
height = 1
block = create_block(tip, create_coinbase(height), block_time)
block.solve()
# Save the coinbase for later
block1 = block
tip = block.sha256
peer.send_blocks_and_test([block1], node, success=True)
self.log.info("Mature the block.")
self.generatetoaddress(
node, COINBASE_MATURITY, node.get_deterministic_priv_key().address
)
best_block = node.getblock(node.getbestblockhash())
tip = int(node.getbestblockhash(), 16)
height = best_block["height"] + 1
block_time = best_block["time"] + 1
# Use merkle-root malleability to generate an invalid block with
# same blockheader (CVE-2012-2459).
# Manufacture a block with 3 transactions (coinbase, spend of prior
# coinbase, spend of that spend). Duplicate the 3rd transaction to
# leave merkle root and blockheader unchanged but invalidate the block.
# For more information on merkle-root malleability see
# src/consensus/merkle.cpp.
self.log.info("Test merkle root malleability.")
block2 = create_block(tip, create_coinbase(height), block_time)
block_time += 1
# b'0x51' is OP_TRUE
tx1 = create_tx_with_script(block1.vtx[0], 0, script_sig=b"", amount=50 * COIN)
tx2 = create_tx_with_script(tx1, 0, script_sig=b"\x51", amount=50 * COIN)
block2.vtx.extend([tx1, tx2])
block2.vtx = [block2.vtx[0]] + sorted(
block2.vtx[1:], key=lambda tx: tx.get_id()
)
block2.hashMerkleRoot = block2.calc_merkle_root()
- block2.rehash()
block2.solve()
orig_hash = block2.sha256
block2_orig = copy.deepcopy(block2)
# Mutate block 2
block2.vtx.append(block2.vtx[2])
assert_equal(block2.hashMerkleRoot, block2.calc_merkle_root())
assert_equal(orig_hash, block2.rehash())
assert block2_orig.vtx != block2.vtx
peer.send_blocks_and_test(
[block2], node, success=False, reject_reason="bad-txns-duplicate"
)
# Check transactions for duplicate inputs (CVE-2018-17144)
self.log.info("Test duplicate input block.")
block2_dup = copy.deepcopy(block2_orig)
block2_dup.vtx[2].vin.append(block2_dup.vtx[2].vin[0])
block2_dup.vtx[2].rehash()
make_conform_to_ctor(block2_dup)
block2_dup.hashMerkleRoot = block2_dup.calc_merkle_root()
- block2_dup.rehash()
block2_dup.solve()
peer.send_blocks_and_test(
[block2_dup], node, success=False, reject_reason="bad-txns-inputs-duplicate"
)
self.log.info("Test very broken block.")
block3 = create_block(tip, create_coinbase(height), block_time)
block_time += 1
block3.vtx[0].vout[0].nValue = 100 * COIN # Too high!
block3.vtx[0].sha256 = None
block3.vtx[0].calc_sha256()
block3.hashMerkleRoot = block3.calc_merkle_root()
- block3.rehash()
block3.solve()
peer.send_blocks_and_test(
[block3], node, success=False, reject_reason="bad-cb-amount"
)
# Complete testing of CVE-2012-2459 by sending the original block.
# It should be accepted even though it has the same hash as the mutated
# one.
self.log.info(
"Test accepting original block after rejecting its mutated version."
)
peer.send_blocks_and_test([block2_orig], node, success=True, timeout=5)
# Update tip info
height += 1
block_time += 1
tip = int(block2_orig.hash, 16)
# Complete testing of CVE-2018-17144, by checking for the inflation bug.
# Create a block that spends the output of a tx in a previous block.
block4 = create_block(tip, create_coinbase(height), block_time)
tx3 = create_tx_with_script(tx2, 0, script_sig=b"\x51", amount=50 * COIN)
# Duplicates input
tx3.vin.append(tx3.vin[0])
tx3.rehash()
block4.vtx.append(tx3)
make_conform_to_ctor(block4)
block4.hashMerkleRoot = block4.calc_merkle_root()
- block4.rehash()
block4.solve()
self.log.info("Test inflation by duplicating input")
peer.send_blocks_and_test(
[block4], node, success=False, reject_reason="bad-txns-inputs-duplicate"
)
self.log.info(
"Test accepting identical block after rejecting it due to a future"
" timestamp."
)
t = int(time.time())
node.setmocktime(t)
# Set block time +1 second past max future validity
block = create_block(
tip, create_coinbase(height), t + MAX_FUTURE_BLOCK_TIME + 1
)
block.hashMerkleRoot = block.calc_merkle_root()
block.solve()
# Need force_send because the block will get rejected without a getdata
# otherwise
peer.send_blocks_and_test(
[block], node, force_send=True, success=False, reject_reason="time-too-new"
)
node.setmocktime(t + 1)
peer.send_blocks_and_test([block], node, success=True)
if __name__ == "__main__":
InvalidBlockRequestTest().main()
diff --git a/test/functional/wallet_resendwallettransactions.py b/test/functional/wallet_resendwallettransactions.py
index dfc38ca05..7495e7b31 100644
--- a/test/functional/wallet_resendwallettransactions.py
+++ b/test/functional/wallet_resendwallettransactions.py
@@ -1,87 +1,86 @@
# Copyright (c) 2017-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test that the wallet resends transactions periodically."""
import time
from test_framework.blocktools import create_block, create_coinbase
from test_framework.messages import ToHex
from test_framework.p2p import P2PTxInvStore
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
class ResendWalletTransactionsTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def run_test(self):
node = self.nodes[0] # alias
peer_first = node.add_p2p_connection(P2PTxInvStore())
self.log.info("Create a new transaction and wait until it's broadcast")
txid = node.sendtoaddress(node.getnewaddress(), 1_000_000)
# Wallet rebroadcast is first scheduled 1 sec after startup (see
# nNextResend in ResendWalletTransactions()). Tell scheduler to call
# MaybeResendWalletTxn now to initialize nNextResend before the first
# setmocktime call below.
node.mockscheduler(1)
# Can take a few seconds due to transaction trickling
peer_first.wait_for_broadcast([txid])
# Add a second peer since txs aren't rebroadcast to the same peer (see
# m_tx_inventory_known_filter)
peer_second = node.add_p2p_connection(P2PTxInvStore())
self.log.info("Create a block")
# Create and submit a block without the transaction.
# Transactions are only rebroadcast if there has been a block at least five minutes
# after the last time we tried to broadcast. Use mocktime and give an
# extra minute to be sure.
block_time = int(time.time()) + 6 * 60
node.setmocktime(block_time)
block = create_block(
int(node.getbestblockhash(), 16),
create_coinbase(node.getblockcount() + 1),
block_time,
)
- block.rehash()
block.solve()
node.submitblock(ToHex(block))
# Set correct m_best_block_time, which is used in
# ResendWalletTransactions
node.syncwithvalidationinterfacequeue()
now = int(time.time())
# Transaction should not be rebroadcast within first 12 hours
# Leave 2 mins for buffer
twelve_hrs = 12 * 60 * 60
two_min = 2 * 60
node.setmocktime(now + twelve_hrs - two_min)
# Tell scheduler to call MaybeResendWalletTxn now
node.mockscheduler(1)
assert_equal(int(txid, 16) in peer_second.get_invs(), False)
self.log.info("Bump time & check that transaction is rebroadcast")
# Transaction should be rebroadcast approximately 24 hours in the future,
# but can range from 12-36. So bump 36 hours to be sure.
with node.assert_debug_log(
["ResendWalletTransactions: resubmit 1 unconfirmed transactions"]
):
node.setmocktime(now + 36 * 60 * 60)
# Tell scheduler to call MaybeResendWalletTxn now.
node.mockscheduler(1)
# Give some time for trickle to occur
node.setmocktime(now + 36 * 60 * 60 + 600)
peer_second.wait_for_broadcast([txid])
if __name__ == "__main__":
ResendWalletTransactionsTest().main()

File Metadata

Mime Type
text/x-diff
Expires
Wed, May 21, 22:59 (23 h, 29 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5866117
Default Alt Text
(125 KB)

Event Timeline