Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F28093933
feature_bip68_sequence.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
19 KB
Subscribers
None
feature_bip68_sequence.py
View Options
# Copyright (c) 2014-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test BIP68 implementation."""
import
time
from
test_framework.blocktools
import
create_block
from
test_framework.messages
import
XEC
,
COutPoint
,
CTransaction
,
CTxIn
,
CTxOut
,
ToHex
from
test_framework.script
import
CScript
from
test_framework.test_framework
import
BitcoinTestFramework
from
test_framework.txtools
import
pad_tx
from
test_framework.util
import
(
assert_equal
,
assert_greater_than
,
assert_raises_rpc_error
,
satoshi_round
,
)
from
test_framework.wallet
import
MiniWallet
SEQUENCE_LOCKTIME_DISABLE_FLAG
=
1
<<
31
# this means use time (0 means height)
SEQUENCE_LOCKTIME_TYPE_FLAG
=
1
<<
22
# this is a bit-shift
SEQUENCE_LOCKTIME_GRANULARITY
=
9
SEQUENCE_LOCKTIME_MASK
=
0x0000FFFF
# RPC error for non-BIP68 final transactions
NOT_FINAL_ERROR
=
"non-BIP68-final"
class
BIP68Test
(
BitcoinTestFramework
):
def
set_test_params
(
self
):
self
.
num_nodes
=
2
self
.
extra_args
=
[
[
"-noparkdeepreorg"
,
"-acceptnonstdtxn=1"
,
],
[
"-acceptnonstdtxn=0"
,
"-automaticunparking=1"
,
],
]
def
run_test
(
self
):
self
.
relayfee
=
self
.
nodes
[
0
]
.
getnetworkinfo
()[
"relayfee"
]
self
.
wallet
=
MiniWallet
(
self
.
nodes
[
0
])
self
.
log
.
info
(
"Running test disable flag"
)
self
.
test_disable_flag
()
self
.
log
.
info
(
"Running test sequence-lock-confirmed-inputs"
)
self
.
test_sequence_lock_confirmed_inputs
()
self
.
log
.
info
(
"Running test sequence-lock-unconfirmed-inputs"
)
self
.
test_sequence_lock_unconfirmed_inputs
()
self
.
log
.
info
(
"Running test BIP68 not consensus before versionbits activation"
)
self
.
test_bip68_not_consensus
()
self
.
log
.
info
(
"Activating BIP68 (and 112/113)"
)
self
.
activateCSV
()
print
(
"Verifying nVersion=2 transactions are standard."
)
print
(
"Note that with current versions of bitcoin software, nVersion=2"
" transactions are always standard (independent of BIP68 activation"
" status)."
)
self
.
test_version2_relay
()
self
.
log
.
info
(
"Passed"
)
# Test that BIP68 is not in effect if tx version is 1, or if
# the first sequence bit is set.
def
test_disable_flag
(
self
):
utxo
=
self
.
wallet
.
send_self_transfer
(
from_node
=
self
.
nodes
[
0
])[
"new_utxo"
]
tx1
=
CTransaction
()
value
=
int
(
satoshi_round
(
utxo
[
"value"
]
-
self
.
relayfee
)
*
XEC
)
# Check that the disable flag disables relative locktime.
# If sequence locks were used, this would require 1 block for the
# input to mature.
sequence_value
=
SEQUENCE_LOCKTIME_DISABLE_FLAG
|
1
tx1
.
vin
=
[
CTxIn
(
COutPoint
(
int
(
utxo
[
"txid"
],
16
),
utxo
[
"vout"
]),
nSequence
=
sequence_value
)
]
tx1
.
vout
=
[
CTxOut
(
value
,
CScript
([
b
"a"
]))]
pad_tx
(
tx1
)
self
.
wallet
.
sign_tx
(
tx
=
tx1
)
tx1_id
=
self
.
wallet
.
sendrawtransaction
(
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx1
.
serialize
()
.
hex
()
)
tx1_id
=
int
(
tx1_id
,
16
)
# This transaction will enable sequence-locks, so this transaction should
# fail
tx2
=
CTransaction
()
tx2
.
nVersion
=
2
sequence_value
=
sequence_value
&
0x7FFFFFFF
tx2
.
vin
=
[
CTxIn
(
COutPoint
(
tx1_id
,
0
),
nSequence
=
sequence_value
)]
tx2
.
vout
=
[
CTxOut
(
int
(
value
-
self
.
relayfee
*
XEC
),
CScript
([
b
"a"
]))]
pad_tx
(
tx2
)
assert_raises_rpc_error
(
-
26
,
NOT_FINAL_ERROR
,
self
.
wallet
.
sendrawtransaction
,
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx2
.
serialize
()
.
hex
(),
)
# Setting the version back down to 1 should disable the sequence lock,
# so this should be accepted.
tx2
.
nVersion
=
1
self
.
wallet
.
sendrawtransaction
(
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx2
.
serialize
()
.
hex
()
)
# Calculate the median time past of a prior block ("confirmations" before
# the current tip).
def
get_median_time_past
(
self
,
confirmations
):
block_hash
=
self
.
nodes
[
0
]
.
getblockhash
(
self
.
nodes
[
0
]
.
getblockcount
()
-
confirmations
)
return
self
.
nodes
[
0
]
.
getblockheader
(
block_hash
)[
"mediantime"
]
# Test that sequence locks are respected for transactions spending
# confirmed inputs.
def
test_sequence_lock_confirmed_inputs
(
self
):
# Create lots of confirmed utxos, and use them to generate lots of random
# transactions.
max_outputs
=
50
while
(
len
(
self
.
wallet
.
get_utxos
(
include_immature_coinbase
=
False
,
mark_as_spent
=
False
)
)
<
200
):
import
random
num_outputs
=
random
.
randint
(
1
,
max_outputs
)
self
.
wallet
.
send_self_transfer_multi
(
from_node
=
self
.
nodes
[
0
],
num_outputs
=
num_outputs
)
self
.
generate
(
self
.
wallet
,
1
)
utxos
=
self
.
wallet
.
get_utxos
(
include_immature_coinbase
=
False
)
# Try creating a lot of random transactions.
# Each time, choose a random number of inputs, and randomly set
# some of those inputs to be sequence locked (and randomly choose
# between height/time locking). Small random chance of making the locks
# all pass.
for
_
in
range
(
400
):
# Randomly choose up to 10 inputs
num_inputs
=
random
.
randint
(
1
,
10
)
random
.
shuffle
(
utxos
)
# Track whether any sequence locks used should fail
should_pass
=
True
# Track whether this transaction was built with sequence locks
using_sequence_locks
=
False
tx
=
CTransaction
()
tx
.
nVersion
=
2
value
=
0
for
j
in
range
(
num_inputs
):
# this disables sequence locks
sequence_value
=
0xFFFFFFFE
# 50% chance we enable sequence locks
if
random
.
randint
(
0
,
1
):
using_sequence_locks
=
True
# 10% of the time, make the input sequence value pass
input_will_pass
=
random
.
randint
(
1
,
10
)
==
1
sequence_value
=
utxos
[
j
][
"confirmations"
]
if
not
input_will_pass
:
sequence_value
+=
1
should_pass
=
False
# Figure out what the median-time-past was for the confirmed input
# Note that if an input has N confirmations, we're going back N blocks
# from the tip so that we're looking up MTP of the block
# PRIOR to the one the input appears in, as per the BIP68
# spec.
orig_time
=
self
.
get_median_time_past
(
utxos
[
j
][
"confirmations"
])
# MTP of the tip
cur_time
=
self
.
get_median_time_past
(
0
)
# can only timelock this input if it's not too old --
# otherwise use height
can_time_lock
=
True
if
(
(
cur_time
-
orig_time
)
>>
SEQUENCE_LOCKTIME_GRANULARITY
)
>=
SEQUENCE_LOCKTIME_MASK
:
can_time_lock
=
False
# if time-lockable, then 50% chance we make this a time
# lock
if
random
.
randint
(
0
,
1
)
and
can_time_lock
:
# Find first time-lock value that fails, or latest one
# that succeeds
time_delta
=
sequence_value
<<
SEQUENCE_LOCKTIME_GRANULARITY
if
input_will_pass
and
time_delta
>
cur_time
-
orig_time
:
sequence_value
=
(
cur_time
-
orig_time
)
>>
SEQUENCE_LOCKTIME_GRANULARITY
elif
not
input_will_pass
and
time_delta
<=
cur_time
-
orig_time
:
sequence_value
=
(
(
cur_time
-
orig_time
)
>>
SEQUENCE_LOCKTIME_GRANULARITY
)
+
1
sequence_value
|=
SEQUENCE_LOCKTIME_TYPE_FLAG
tx
.
vin
.
append
(
CTxIn
(
COutPoint
(
int
(
utxos
[
j
][
"txid"
],
16
),
utxos
[
j
][
"vout"
]),
nSequence
=
sequence_value
,
)
)
value
+=
utxos
[
j
][
"value"
]
*
XEC
# Overestimate the size of the tx - signatures should be less than
# 120 bytes, and leave 50 for the output
tx_size
=
len
(
ToHex
(
tx
))
//
2
+
120
*
num_inputs
+
50
tx
.
vout
.
append
(
CTxOut
(
int
(
value
-
self
.
relayfee
*
tx_size
*
XEC
/
1000
),
CScript
([
b
"a"
])
)
)
self
.
wallet
.
sign_tx
(
tx
=
tx
)
pad_tx
(
tx
)
if
using_sequence_locks
and
not
should_pass
:
# This transaction should be rejected
assert_raises_rpc_error
(
-
26
,
NOT_FINAL_ERROR
,
self
.
wallet
.
sendrawtransaction
,
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx
.
serialize
()
.
hex
(),
)
else
:
# This raw transaction should be accepted
self
.
wallet
.
sendrawtransaction
(
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx
.
serialize
()
.
hex
()
)
self
.
wallet
.
rescan_utxos
()
utxos
=
self
.
wallet
.
get_utxos
(
include_immature_coinbase
=
False
)
# Test that sequence locks on unconfirmed inputs must have nSequence
# height or time of 0 to be accepted.
# Then test that BIP68-invalid transactions are removed from the mempool
# after a reorg.
def
test_sequence_lock_unconfirmed_inputs
(
self
):
# Store height so we can easily reset the chain at the end of the test
cur_height
=
self
.
nodes
[
0
]
.
getblockcount
()
# Create a mempool tx.
self
.
wallet
.
rescan_utxos
()
tx1
=
self
.
wallet
.
send_self_transfer
(
from_node
=
self
.
nodes
[
0
])[
"tx"
]
# As the fees are calculated prior to the transaction being signed,
# there is some uncertainty that calculate fee provides the correct
# minimal fee. Since regtest coins are free, let's go ahead and
# increase the fee by an order of magnitude to ensure this test
# passes.
fee_multiplier
=
10
# Anyone-can-spend mempool tx.
# Sequence lock of 0 should pass.
tx2
=
CTransaction
()
tx2
.
nVersion
=
2
tx2
.
vin
=
[
CTxIn
(
COutPoint
(
tx1
.
sha256
,
0
),
nSequence
=
0
)]
tx2
.
vout
=
[
CTxOut
(
int
(
0
),
CScript
([
b
"a"
]))]
tx2
.
vout
[
0
]
.
nValue
=
tx1
.
vout
[
0
]
.
nValue
-
fee_multiplier
*
self
.
nodes
[
0
]
.
calculate_fee
(
tx2
)
self
.
wallet
.
sign_tx
(
tx
=
tx2
)
pad_tx
(
tx2
)
tx2_raw
=
tx2
.
serialize
()
.
hex
()
self
.
wallet
.
sendrawtransaction
(
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx2_raw
)
# Create a spend of the 0th output of orig_tx with a sequence lock
# of 1, and test what happens when submitting.
# orig_tx.vout[0] must be an anyone-can-spend output
def
test_nonzero_locks
(
orig_tx
,
node
,
use_height_lock
):
sequence_value
=
1
if
not
use_height_lock
:
sequence_value
|=
SEQUENCE_LOCKTIME_TYPE_FLAG
tx
=
CTransaction
()
tx
.
nVersion
=
2
tx
.
vin
=
[
CTxIn
(
COutPoint
(
orig_tx
.
sha256
,
0
),
nSequence
=
sequence_value
)]
tx
.
vout
=
[
CTxOut
(
int
(
orig_tx
.
vout
[
0
]
.
nValue
-
fee_multiplier
*
node
.
calculate_fee
(
tx
)
),
CScript
([
b
"a"
]),
)
]
pad_tx
(
tx
)
if
orig_tx
.
hash
in
node
.
getrawmempool
():
# sendrawtransaction should fail if the tx is in the mempool
assert_raises_rpc_error
(
-
26
,
NOT_FINAL_ERROR
,
self
.
wallet
.
sendrawtransaction
,
from_node
=
node
,
tx_hex
=
tx
.
serialize
()
.
hex
(),
)
else
:
# sendrawtransaction should succeed if the tx is not in the
# mempool
self
.
wallet
.
sendrawtransaction
(
from_node
=
node
,
tx_hex
=
tx
.
serialize
()
.
hex
()
)
return
tx
test_nonzero_locks
(
tx2
,
self
.
nodes
[
0
],
use_height_lock
=
True
)
test_nonzero_locks
(
tx2
,
self
.
nodes
[
0
],
use_height_lock
=
False
)
# Now mine some blocks, but make sure tx2 doesn't get mined.
# Use prioritisetransaction to lower the effective feerate to 0
self
.
nodes
[
0
]
.
prioritisetransaction
(
txid
=
tx2
.
hash
,
fee_delta
=-
fee_multiplier
*
self
.
nodes
[
0
]
.
calculate_fee
(
tx2
)
)
cur_time
=
int
(
time
.
time
())
for
_
in
range
(
10
):
self
.
nodes
[
0
]
.
setmocktime
(
cur_time
+
600
)
self
.
generate
(
self
.
wallet
,
1
,
sync_fun
=
self
.
no_op
)
cur_time
+=
600
assert
tx2
.
hash
in
self
.
nodes
[
0
]
.
getrawmempool
()
test_nonzero_locks
(
tx2
,
self
.
nodes
[
0
],
use_height_lock
=
True
)
test_nonzero_locks
(
tx2
,
self
.
nodes
[
0
],
use_height_lock
=
False
)
# Mine tx2, and then try again
self
.
nodes
[
0
]
.
prioritisetransaction
(
txid
=
tx2
.
hash
,
fee_delta
=
fee_multiplier
*
self
.
nodes
[
0
]
.
calculate_fee
(
tx2
)
)
# Advance the time on the node so that we can test timelocks
self
.
nodes
[
0
]
.
setmocktime
(
cur_time
+
600
)
# Save block template now to use for the reorg later
tmpl
=
self
.
nodes
[
0
]
.
getblocktemplate
()
self
.
generate
(
self
.
nodes
[
0
],
1
)
assert
tx2
.
hash
not
in
self
.
nodes
[
0
]
.
getrawmempool
()
# Now that tx2 is not in the mempool, a sequence locked spend should
# succeed
tx3
=
test_nonzero_locks
(
tx2
,
self
.
nodes
[
0
],
use_height_lock
=
False
)
assert
tx3
.
hash
in
self
.
nodes
[
0
]
.
getrawmempool
()
self
.
generate
(
self
.
nodes
[
0
],
1
)
assert
tx3
.
hash
not
in
self
.
nodes
[
0
]
.
getrawmempool
()
# One more test, this time using height locks
tx4
=
test_nonzero_locks
(
tx3
,
self
.
nodes
[
0
],
use_height_lock
=
True
)
assert
tx4
.
hash
in
self
.
nodes
[
0
]
.
getrawmempool
()
# Now try combining confirmed and unconfirmed inputs
tx5
=
test_nonzero_locks
(
tx4
,
self
.
nodes
[
0
],
use_height_lock
=
True
)
assert
tx5
.
hash
not
in
self
.
nodes
[
0
]
.
getrawmempool
()
utxo
=
self
.
wallet
.
get_utxo
()
tx5
.
vin
.
append
(
CTxIn
(
COutPoint
(
int
(
utxo
[
"txid"
],
16
),
utxo
[
"vout"
]),
nSequence
=
1
)
)
tx5
.
vout
[
0
]
.
nValue
+=
int
(
utxo
[
"value"
]
*
XEC
)
self
.
wallet
.
sign_tx
(
tx
=
tx5
)
assert_raises_rpc_error
(
-
26
,
NOT_FINAL_ERROR
,
self
.
wallet
.
sendrawtransaction
,
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx5
.
serialize
()
.
hex
(),
)
# Test mempool-BIP68 consistency after reorg
#
# State of the transactions in the last blocks:
# ... -> [ tx2 ] -> [ tx3 ]
# tip-1 tip
# And currently tx4 is in the mempool.
#
# If we invalidate the tip, tx3 should get added to the mempool, causing
# tx4 to be removed (fails sequence-lock).
self
.
nodes
[
0
]
.
invalidateblock
(
self
.
nodes
[
0
]
.
getbestblockhash
())
assert
tx4
.
hash
not
in
self
.
nodes
[
0
]
.
getrawmempool
()
assert
tx3
.
hash
in
self
.
nodes
[
0
]
.
getrawmempool
()
# Now mine 2 empty blocks to reorg out the current tip (labeled tip-1 in
# diagram above).
# This would cause tx2 to be added back to the mempool, which in turn causes
# tx3 to be removed.
for
i
in
range
(
2
):
block
=
create_block
(
tmpl
=
tmpl
,
ntime
=
cur_time
)
block
.
solve
()
tip
=
block
.
sha256
assert_equal
(
None
if
i
==
1
else
"inconclusive"
,
self
.
nodes
[
0
]
.
submitblock
(
ToHex
(
block
)),
)
tmpl
=
self
.
nodes
[
0
]
.
getblocktemplate
()
tmpl
[
"previousblockhash"
]
=
f
"{tip:x}"
tmpl
[
"transactions"
]
=
[]
cur_time
+=
1
mempool
=
self
.
nodes
[
0
]
.
getrawmempool
()
assert
tx3
.
hash
not
in
mempool
assert
tx2
.
hash
in
mempool
# Reset the chain and get rid of the mocktimed-blocks
self
.
nodes
[
0
]
.
setmocktime
(
0
)
self
.
nodes
[
0
]
.
invalidateblock
(
self
.
nodes
[
0
]
.
getblockhash
(
cur_height
+
1
))
self
.
generate
(
self
.
wallet
,
10
,
sync_fun
=
self
.
no_op
)
def
get_csv_status
(
self
):
height
=
self
.
nodes
[
0
]
.
getblockchaininfo
()[
"blocks"
]
return
height
>=
576
# Make sure that BIP68 isn't being used to validate blocks, prior to
# versionbits activation. If more blocks are mined prior to this test
# being run, then it's possible the test has activated the soft fork, and
# this test should be moved to run earlier, or deleted.
def
test_bip68_not_consensus
(
self
):
assert_equal
(
self
.
get_csv_status
(),
False
)
tx1
=
self
.
wallet
.
send_self_transfer
(
from_node
=
self
.
nodes
[
0
])[
"tx"
]
# Make an anyone-can-spend transaction
tx2
=
CTransaction
()
tx2
.
nVersion
=
1
tx2
.
vin
=
[
CTxIn
(
COutPoint
(
tx1
.
sha256
,
0
),
nSequence
=
0
)]
tx2
.
vout
=
[
CTxOut
(
int
(
tx1
.
vout
[
0
]
.
nValue
-
self
.
relayfee
*
XEC
),
CScript
([
b
"a"
]))
]
# sign tx2
self
.
wallet
.
sign_tx
(
tx
=
tx2
)
pad_tx
(
tx2
)
tx2_raw
=
tx2
.
serialize
()
.
hex
()
self
.
wallet
.
sendrawtransaction
(
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx2_raw
)
# Now make an invalid spend of tx2 according to BIP68
# 100 block relative locktime
sequence_value
=
100
tx3
=
CTransaction
()
tx3
.
nVersion
=
2
tx3
.
vin
=
[
CTxIn
(
COutPoint
(
tx2
.
sha256
,
0
),
nSequence
=
sequence_value
)]
tx3
.
vout
=
[
CTxOut
(
int
(
tx2
.
vout
[
0
]
.
nValue
-
self
.
relayfee
*
XEC
),
CScript
([
b
"a"
]))
]
pad_tx
(
tx3
)
assert_raises_rpc_error
(
-
26
,
NOT_FINAL_ERROR
,
self
.
wallet
.
sendrawtransaction
,
from_node
=
self
.
nodes
[
0
],
tx_hex
=
tx3
.
serialize
()
.
hex
(),
)
# make a block that violates bip68; ensure that the tip updates
block
=
create_block
(
tmpl
=
self
.
nodes
[
0
]
.
getblocktemplate
(),
txlist
=
[
tx1
,
tx2
,
tx3
]
)
block
.
solve
()
assert_equal
(
None
,
self
.
nodes
[
0
]
.
submitblock
(
ToHex
(
block
)))
assert_equal
(
self
.
nodes
[
0
]
.
getbestblockhash
(),
block
.
hash
)
def
activateCSV
(
self
):
# activation should happen at block height 576
csv_activation_height
=
576
height
=
self
.
nodes
[
0
]
.
getblockcount
()
assert_greater_than
(
csv_activation_height
-
height
,
1
)
self
.
generate
(
self
.
wallet
,
csv_activation_height
-
height
-
1
,
sync_fun
=
self
.
no_op
)
assert_equal
(
self
.
get_csv_status
(),
False
)
self
.
disconnect_nodes
(
0
,
1
)
self
.
generate
(
self
.
wallet
,
1
,
sync_fun
=
self
.
no_op
)
assert_equal
(
self
.
get_csv_status
(),
True
)
# We have a block that has CSV activated, but we want to be at
# the activation point, so we invalidate the tip.
self
.
nodes
[
0
]
.
invalidateblock
(
self
.
nodes
[
0
]
.
getbestblockhash
())
self
.
connect_nodes
(
0
,
1
)
self
.
sync_blocks
()
# Use self.nodes[1] to test that version 2 transactions are standard.
def
test_version2_relay
(
self
):
mini_wallet
=
MiniWallet
(
self
.
nodes
[
1
])
mini_wallet
.
rescan_utxos
()
tx
=
mini_wallet
.
create_self_transfer
()[
"tx"
]
tx
.
nVersion
=
2
mini_wallet
.
sendrawtransaction
(
from_node
=
self
.
nodes
[
1
],
tx_hex
=
tx
.
serialize
()
.
hex
()
)
if
__name__
==
"__main__"
:
BIP68Test
()
.
main
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Mon, Nov 17, 02:35 (1 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
7107307
Default Alt Text
feature_bip68_sequence.py (19 KB)
Attached To
rSTAGING Bitcoin ABC staging
Event Timeline
Log In to Comment