Page MenuHomePhabricator

No OneTemporary

diff --git a/test/functional/abc_p2p_proof_inventory.py b/test/functional/abc_p2p_proof_inventory.py
index 5eba81af7..f80e5a7c3 100644
--- a/test/functional/abc_p2p_proof_inventory.py
+++ b/test/functional/abc_p2p_proof_inventory.py
@@ -1,355 +1,355 @@
#!/usr/bin/env python3
# Copyright (c) 2021 The Bitcoin developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Test proof inventory relaying
"""
import time
from test_framework.address import ADDRESS_ECREG_UNSPENDABLE
from test_framework.avatools import (
AvaP2PInterface,
avalanche_proof_from_hex,
gen_proof,
get_proof_ids,
wait_for_proof,
)
from test_framework.messages import (
MSG_AVA_PROOF,
MSG_TYPE_MASK,
CInv,
msg_avaproof,
msg_getdata,
)
from test_framework.p2p import P2PInterface, p2p_lock
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
assert_greater_than,
assert_raises_rpc_error,
uint256_hex,
)
from test_framework.wallet_util import bytes_to_wif
# Broadcast reattempt occurs every 10 to 15 minutes
MAX_INITIAL_BROADCAST_DELAY = 15 * 60
# Delay to allow the node to respond to getdata requests
UNCONDITIONAL_RELAY_DELAY = 2 * 60
class ProofInvStoreP2PInterface(P2PInterface):
def __init__(self):
super().__init__()
self.proof_invs_counter = 0
self.last_proofid = None
def on_inv(self, message):
for i in message.inv:
if i.type & MSG_TYPE_MASK == MSG_AVA_PROOF:
self.proof_invs_counter += 1
self.last_proofid = i.hash
class ProofInventoryTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 5
self.extra_args = [[
'-avaproofstakeutxodustthreshold=1000000',
'-avaproofstakeutxoconfirmations=2',
'-avacooldown=0',
'-whitelist=noban@127.0.0.1',
]] * self.num_nodes
def generate_proof(self, node, mature=True):
privkey, proof = gen_proof(self, node)
if mature:
self.generate(node, 1, sync_fun=self.no_op)
return privkey, proof
def test_send_proof_inv(self):
self.log.info("Test sending a proof to our peers")
node = self.nodes[0]
for _ in range(10):
node.add_p2p_connection(ProofInvStoreP2PInterface())
_, proof = self.generate_proof(node)
assert node.sendavalancheproof(proof.serialize().hex())
def proof_inv_found(peer):
with p2p_lock:
return peer.last_proofid == proof.proofid
self.wait_until(lambda: all(proof_inv_found(i) for i in node.p2ps))
self.log.info("Test that we don't send the same inv several times")
extra_peer = ProofInvStoreP2PInterface()
node.add_p2p_connection(extra_peer)
# Send the same proof one more time
node.sendavalancheproof(proof.serialize().hex())
# Our new extra peer should receive it but not the others
self.wait_until(lambda: proof_inv_found(extra_peer))
assert all(p.proof_invs_counter == 1 for p in node.p2ps)
# Send the proof again and force the send loop to be processed
for peer in node.p2ps:
node.sendavalancheproof(proof.serialize().hex())
peer.sync_with_ping()
assert all(p.proof_invs_counter == 1 for p in node.p2ps)
def test_receive_proof(self):
self.log.info("Test a peer is created on proof reception")
node = self.nodes[0]
_, proof = self.generate_proof(node)
peer = node.add_p2p_connection(P2PInterface())
msg = msg_avaproof()
msg.proof = proof
peer.send_message(msg)
self.wait_until(lambda: proof.proofid in get_proof_ids(node))
self.log.info("Test receiving a proof with an immature utxo")
_, immature = self.generate_proof(node, mature=False)
immature_proofid = uint256_hex(immature.proofid)
msg = msg_avaproof()
msg.proof = immature
peer.send_message(msg)
wait_for_proof(node, immature_proofid, expect_status="immature")
def test_ban_invalid_proof(self):
node = self.nodes[0]
_, bad_proof = self.generate_proof(node)
bad_proof.stakes = []
privkey = node.get_deterministic_priv_key().key
missing_stake = node.buildavalancheproof(
1, 0, privkey, [{
'txid': '0' * 64,
'vout': 0,
'amount': 10000000,
'height': 42,
'iscoinbase': False,
'privatekey': privkey,
}]
)
self.restart_node(
0, ['-avaproofstakeutxodustthreshold=1000000'])
peer = node.add_p2p_connection(P2PInterface())
msg = msg_avaproof()
# Sending a proof with a missing utxo doesn't trigger a ban
msg.proof = avalanche_proof_from_hex(missing_stake)
with node.assert_debug_log(["received: avaproof"], ["Misbehaving"]):
peer.send_message(msg)
peer.sync_with_ping()
msg.proof = bad_proof
with node.assert_debug_log([
'Misbehaving',
'invalid-proof',
]):
peer.send_message(msg)
peer.wait_for_disconnect()
def test_proof_relay(self):
# This test makes no sense with less than 2 nodes !
assert_greater_than(self.num_nodes, 2)
proofs_keys = [self.generate_proof(self.nodes[0]) for _ in self.nodes]
proofids = {proof_key[1].proofid for proof_key in proofs_keys}
# generate_proof does not sync, so do it manually
self.sync_blocks()
def restart_nodes_with_proof(nodes, extra_args=None):
for node in nodes:
privkey, proof = proofs_keys[node.index]
self.restart_node(node.index, self.extra_args[node.index] + [
f"-avaproof={proof.serialize().hex()}",
f"-avamasterkey={bytes_to_wif(privkey.get_bytes())}"
] + (extra_args or []))
restart_nodes_with_proof(self.nodes[:-1])
chainwork = int(self.nodes[-1].getblockchaininfo()['chainwork'], 16)
restart_nodes_with_proof(
self.nodes[-1:], extra_args=[f'-minimumchainwork={chainwork + 100:#x}'])
# Add an inbound so the node proof can be registered and advertised
[node.add_p2p_connection(P2PInterface()) for node in self.nodes]
[[self.connect_nodes(node.index, j)
for j in range(node.index)] for node in self.nodes]
# Connect a block to make the proofs added to our pool
self.generate(self.nodes[0], 1, sync_fun=self.sync_blocks)
self.log.info("Nodes should eventually get the proof from their peer")
self.sync_proofs(self.nodes[:-1])
for node in self.nodes[:-1]:
assert_equal(set(get_proof_ids(node)), proofids)
assert self.nodes[-1].getblockchaininfo()['initialblockdownload']
self.log.info("Except the node that has not completed IBD")
assert_equal(len(get_proof_ids(self.nodes[-1])), 1)
# The same if we send a proof directly with no download request
peer = AvaP2PInterface()
self.nodes[-1].add_p2p_connection(peer)
_, proof = self.generate_proof(self.nodes[0])
peer.send_avaproof(proof)
peer.sync_send_with_ping()
with p2p_lock:
assert_equal(peer.message_count.get('getdata', 0), 0)
# Leave the nodes in good shape for the next tests
restart_nodes_with_proof(self.nodes)
[[self.connect_nodes(node.index, j)
for j in range(node.index)] for node in self.nodes]
def test_manually_sent_proof(self):
node0 = self.nodes[0]
_, proof = self.generate_proof(node0)
self.log.info(
"Send a proof via RPC and check all the nodes download it")
node0.sendavalancheproof(proof.serialize().hex())
self.sync_proofs()
def test_unbroadcast(self):
self.log.info("Test broadcasting proofs")
node = self.nodes[0]
# Disconnect the other nodes/peers, or they will request the proof and
# invalidate the test
[n.stop_node() for n in self.nodes[1:]]
node.disconnect_p2ps()
def add_peers(count):
peers = []
for i in range(count):
peer = node.add_p2p_connection(ProofInvStoreP2PInterface())
peer.wait_for_verack()
peers.append(peer)
return peers
_, proof = self.generate_proof(node)
proofid_hex = uint256_hex(proof.proofid)
# Broadcast the proof
peers = add_peers(3)
assert node.sendavalancheproof(proof.serialize().hex())
wait_for_proof(node, proofid_hex)
def proof_inv_received(peers):
with p2p_lock:
return all(p.last_message.get(
"inv") and p.last_message["inv"].inv[-1].hash == proof.proofid for p in peers)
self.wait_until(lambda: proof_inv_received(peers))
# If no peer request the proof for download, the node should reattempt
# broadcasting to all new peers after 10 to 15 minutes.
peers = add_peers(3)
node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY + 1)
peers[-1].sync_with_ping()
self.wait_until(lambda: proof_inv_received(peers))
# If at least one peer requests the proof, there is no more attempt to
# broadcast it
node.setmocktime(int(time.time()) + UNCONDITIONAL_RELAY_DELAY)
msg = msg_getdata([CInv(t=MSG_AVA_PROOF, h=proof.proofid)])
peers[-1].send_message(msg)
# Give enough time for the node to broadcast the proof again
peers = add_peers(3)
node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY + 1)
peers[-1].sync_with_ping()
assert not proof_inv_received(peers)
self.log.info(
"Proofs that become invalid should no longer be broadcasted")
# Restart and add connect a new set of peers
self.restart_node(0)
# Broadcast the proof
peers = add_peers(3)
assert node.sendavalancheproof(proof.serialize().hex())
self.wait_until(lambda: proof_inv_received(peers))
# Sanity check our node knows the proof, and it is valid
wait_for_proof(node, proofid_hex)
# Mature the utxo then spend it
self.generate(node, 100, sync_fun=self.no_op)
utxo = proof.stakes[0].stake.utxo
raw_tx = node.createrawtransaction(
inputs=[{
# coinbase
- "txid": uint256_hex(utxo.hash),
+ "txid": uint256_hex(utxo.txid),
"vout": utxo.n
}],
outputs={ADDRESS_ECREG_UNSPENDABLE: 25_000_000 - 250.00},
)
signed_tx = node.signrawtransactionwithkey(
hexstring=raw_tx,
privkeys=[node.get_deterministic_priv_key().key],
)
node.sendrawtransaction(signed_tx['hex'])
# Mine the tx in a block
self.generate(node, 1, sync_fun=self.no_op)
# Wait for the proof to be invalidated
def check_proof_not_found(proofid):
try:
assert_raises_rpc_error(-8,
"Proof not found",
node.getrawavalancheproof,
proofid)
return True
except BaseException:
return False
self.wait_until(lambda: check_proof_not_found(proofid_hex))
# It should no longer be broadcasted
peers = add_peers(3)
node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY + 1)
peers[-1].sync_with_ping()
assert not proof_inv_received(peers)
def run_test(self):
self.test_send_proof_inv()
self.test_receive_proof()
self.test_proof_relay()
self.test_manually_sent_proof()
# Run these tests last because they need to disconnect the nodes
self.test_unbroadcast()
self.test_ban_invalid_proof()
if __name__ == '__main__':
ProofInventoryTest().main()
diff --git a/test/functional/data/invalid_txs.py b/test/functional/data/invalid_txs.py
index 7f8cd4062..14f8103d9 100644
--- a/test/functional/data/invalid_txs.py
+++ b/test/functional/data/invalid_txs.py
@@ -1,262 +1,262 @@
#!/usr/bin/env python3
# Copyright (c) 2015-2018 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Templates for constructing various sorts of invalid transactions.
These templates (or an iterator over all of them) can be reused in different
contexts to test using a number of invalid transaction types.
Hopefully this makes it easier to get coverage of a full variety of tx
validation checks through different interfaces (AcceptBlock, AcceptToMemPool,
etc.) without repeating ourselves.
Invalid tx cases not covered here can be found by running:
$ diff \
<(grep -IREho "bad-txns[a-zA-Z-]+" src | sort -u) \
<(grep -IEho "bad-txns[a-zA-Z-]+" test/functional/data/invalid_txs.py | sort -u)
"""
import abc
from typing import Optional
from test_framework import script as sc
from test_framework.blocktools import create_tx_with_script
from test_framework.messages import MAX_MONEY, COutPoint, CTransaction, CTxIn, CTxOut
from test_framework.script import (
OP_2DIV,
OP_2MUL,
OP_INVERT,
OP_LSHIFT,
OP_MUL,
OP_RSHIFT,
CScript,
)
from test_framework.txtools import pad_tx
basic_p2sh = sc.CScript(
[sc.OP_HASH160, sc.hash160(sc.CScript([sc.OP_0])), sc.OP_EQUAL])
class BadTxTemplate:
"""Allows simple construction of a certain kind of invalid tx. Base class to be subclassed."""
__metaclass__ = abc.ABCMeta
# The expected error code given by bitcoind upon submission of the tx.
reject_reason: Optional[str] = ""
# Only specified if it differs from mempool acceptance error.
block_reject_reason = ""
# Do we expect to be disconnected after submitting this tx?
expect_disconnect = False
# Is this tx considered valid when included in a block, but not for acceptance into
# the mempool (i.e. does it violate policy but not consensus)?
valid_in_block = False
def __init__(self, *, spend_tx=None, spend_block=None):
self.spend_tx = spend_block.vtx[0] if spend_block else spend_tx
self.spend_avail = sum(o.nValue for o in self.spend_tx.vout)
self.valid_txin = CTxIn(
COutPoint(
self.spend_tx.sha256,
0),
b"",
0xffffffff)
@abc.abstractmethod
def get_tx(self, *args, **kwargs):
"""Return a CTransaction that is invalid per the subclass."""
pass
class OutputMissing(BadTxTemplate):
reject_reason = "bad-txns-vout-empty"
expect_disconnect = True
def get_tx(self):
tx = CTransaction()
tx.vin.append(self.valid_txin)
tx.calc_sha256()
return tx
class InputMissing(BadTxTemplate):
reject_reason = "bad-txns-vin-empty"
expect_disconnect = True
def get_tx(self):
tx = CTransaction()
tx.vout.append(CTxOut(0, sc.CScript([sc.OP_TRUE] * 100)))
tx.calc_sha256()
return tx
class SizeTooSmall(BadTxTemplate):
reject_reason = "bad-txns-undersize"
expect_disconnect = False
valid_in_block = True
def get_tx(self):
tx = CTransaction()
tx.vin.append(self.valid_txin)
tx.vout.append(CTxOut(0, sc.CScript([sc.OP_TRUE])))
tx.calc_sha256()
return tx
class BadInputOutpointIndex(BadTxTemplate):
# Won't be rejected - nonexistent outpoint index is treated as an orphan since the coins
# database can't distinguish between spent outpoints and outpoints which
# never existed.
reject_reason = None
expect_disconnect = False
def get_tx(self):
num_indices = len(self.spend_tx.vin)
bad_idx = num_indices + 100
tx = CTransaction()
tx.vin.append(
CTxIn(
COutPoint(
self.spend_tx.sha256,
bad_idx),
b"",
0xffffffff))
tx.vout.append(CTxOut(0, basic_p2sh))
tx.calc_sha256()
return tx
class DuplicateInput(BadTxTemplate):
reject_reason = 'bad-txns-inputs-duplicate'
expect_disconnect = True
def get_tx(self):
tx = CTransaction()
tx.vin.append(self.valid_txin)
tx.vin.append(self.valid_txin)
tx.vout.append(CTxOut(1, basic_p2sh))
tx.calc_sha256()
return tx
class PrevoutNullInput(BadTxTemplate):
reject_reason = 'bad-txns-prevout-null'
expect_disconnect = True
def get_tx(self):
tx = CTransaction()
tx.vin.append(self.valid_txin)
- tx.vin.append(CTxIn(COutPoint(hash=0, n=0xffffffff)))
+ tx.vin.append(CTxIn(COutPoint(txid=0, n=0xffffffff)))
tx.vout.append(CTxOut(1, basic_p2sh))
tx.calc_sha256()
return tx
class NonexistentInput(BadTxTemplate):
# Added as an orphan tx.
reject_reason = None
expect_disconnect = False
def get_tx(self):
tx = CTransaction()
tx.vin.append(
CTxIn(
COutPoint(
self.spend_tx.sha256 +
1,
0),
b"",
0xffffffff))
tx.vin.append(self.valid_txin)
tx.vout.append(CTxOut(1, basic_p2sh))
tx.calc_sha256()
return tx
class SpendTooMuch(BadTxTemplate):
reject_reason = 'bad-txns-in-belowout'
expect_disconnect = True
def get_tx(self):
return create_tx_with_script(
self.spend_tx, 0, script_pub_key=basic_p2sh, amount=(self.spend_avail + 1))
class CreateNegative(BadTxTemplate):
reject_reason = 'bad-txns-vout-negative'
expect_disconnect = True
def get_tx(self):
return create_tx_with_script(self.spend_tx, 0, amount=-1)
class CreateTooLarge(BadTxTemplate):
reject_reason = 'bad-txns-vout-toolarge'
expect_disconnect = True
def get_tx(self):
return create_tx_with_script(self.spend_tx, 0, amount=MAX_MONEY + 1)
class CreateSumTooLarge(BadTxTemplate):
reject_reason = 'bad-txns-txouttotal-toolarge'
expect_disconnect = True
def get_tx(self):
tx = create_tx_with_script(self.spend_tx, 0, amount=MAX_MONEY)
tx.vout = [tx.vout[0]] * 2
tx.calc_sha256()
return tx
class InvalidOPIFConstruction(BadTxTemplate):
reject_reason = "mandatory-script-verify-flag-failed (Invalid OP_IF construction)"
expect_disconnect = True
valid_in_block = True
def get_tx(self):
return create_tx_with_script(
self.spend_tx, 0, script_sig=b'\x64' * 35,
amount=(self.spend_avail // 2))
def getDisabledOpcodeTemplate(opcode):
""" Creates disabled opcode tx template class"""
def get_tx(self):
tx = CTransaction()
vin = self.valid_txin
vin.scriptSig = CScript([opcode])
tx.vin.append(vin)
tx.vout.append(CTxOut(1, basic_p2sh))
pad_tx(tx)
tx.calc_sha256()
return tx
return type(f"DisabledOpcode_{str(opcode)}", (BadTxTemplate,), {
'reject_reason': "disabled opcode",
'expect_disconnect': True,
'get_tx': get_tx,
'valid_in_block': True
})
# Disabled opcode tx templates (CVE-2010-5137)
DisabledOpcodeTemplates = [getDisabledOpcodeTemplate(opcode) for opcode in [
OP_INVERT,
OP_2MUL,
OP_2DIV,
OP_MUL,
OP_LSHIFT,
OP_RSHIFT]]
def iter_all_templates():
"""Iterate through all bad transaction template types."""
return BadTxTemplate.__subclasses__()
diff --git a/test/functional/feature_utxo_set_hash.py b/test/functional/feature_utxo_set_hash.py
index 3b072abca..d6fe6971f 100755
--- a/test/functional/feature_utxo_set_hash.py
+++ b/test/functional/feature_utxo_set_hash.py
@@ -1,91 +1,91 @@
#!/usr/bin/env python3
# Copyright (c) 2020-2021 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test UTXO set hash value calculation in gettxoutsetinfo."""
import struct
from test_framework.blocktools import create_transaction
from test_framework.messages import CBlock, COutPoint, FromHex
from test_framework.muhash import MuHash3072
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal
class UTXOSetHashTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def test_deterministic_hash_results(self):
self.log.info("Test deterministic UTXO set hash results")
# These depend on the setup_clean_chain option, the chain loaded from
# the cache
assert_equal(
self.nodes[0].gettxoutsetinfo()['hash_serialized'],
"b32ec1dda5a53cd025b95387aad344a801825fe46a60ff952ce26528f01d3be8")
assert_equal(
self.nodes[0].gettxoutsetinfo("muhash")['muhash'],
"dd5ad2a105c2d29495f577245c357409002329b9f4d6182c0af3dc2f462555c8")
def test_muhash_implementation(self):
self.log.info("Test MuHash implementation consistency")
node = self.nodes[0]
# Generate 100 blocks and remove the first since we plan to spend its
# coinbase
block_hashes = self.generate(node, 100)
blocks = [
FromHex(CBlock(), node.getblock(block, False)) for block in block_hashes]
spending = blocks.pop(0)
# Create a spending transaction and mine a block which includes it
tx = create_transaction(
node, spending.vtx[0].rehash(), node.getnewaddress(),
amount=49_000_000)
txid = node.sendrawtransaction(
hexstring=tx.serialize().hex(), maxfeerate=0)
tx_block = self.generateblock(node,
output=node.getnewaddress(),
transactions=[txid])
blocks.append(
FromHex(CBlock(), node.getblock(tx_block['hash'], False)))
# Serialize the outputs that should be in the UTXO set and add them to
# a MuHash object
muhash = MuHash3072()
for height, block in enumerate(blocks):
# The Genesis block coinbase is not part of the UTXO set and we
# spent the first mined block
height += 2
for tx in block.vtx:
for n, tx_out in enumerate(tx.vout):
- coinbase = 1 if not tx.vin[0].prevout.hash else 0
+ coinbase = 1 if not tx.vin[0].prevout.txid else 0
data = COutPoint(int(tx.rehash(), 16), n).serialize()
data += struct.pack("<i", height * 2 + coinbase)
data += tx_out.serialize()
muhash.insert(data)
finalized = muhash.digest()
node_muhash = node.gettxoutsetinfo("muhash")['muhash']
assert_equal(finalized[::-1].hex(), node_muhash)
def run_test(self):
self.test_deterministic_hash_results()
self.test_muhash_implementation()
if __name__ == '__main__':
UTXOSetHashTest().main()
diff --git a/test/functional/mempool_accept.py b/test/functional/mempool_accept.py
index 7b2494e5e..ad4722215 100755
--- a/test/functional/mempool_accept.py
+++ b/test/functional/mempool_accept.py
@@ -1,400 +1,400 @@
#!/usr/bin/env python3
# Copyright (c) 2017-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test mempool acceptance of raw transactions."""
from decimal import Decimal
from test_framework.key import ECKey
from test_framework.messages import (
MAX_BLOCK_BASE_SIZE,
MAX_MONEY,
XEC,
COutPoint,
CTransaction,
CTxIn,
CTxOut,
FromHex,
ToHex,
)
from test_framework.script import (
OP_0,
OP_2,
OP_3,
OP_CHECKMULTISIG,
OP_EQUAL,
OP_HASH160,
OP_RETURN,
CScript,
hash160,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
class MempoolAcceptanceTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 1
self.extra_args = [[
'-txindex',
'-acceptnonstdtxn=0', # Try to mimic main-net
'-permitbaremultisig=0',
]] * self.num_nodes
self.supports_cli = False
def skip_test_if_missing_module(self):
self.skip_if_no_wallet()
def check_mempool_result(self, result_expected, *args, **kwargs):
"""Wrapper to check result of testmempoolaccept on node_0's mempool"""
result_test = self.nodes[0].testmempoolaccept(*args, **kwargs)
assert_equal(result_expected, result_test)
# Must not change mempool state
assert_equal(self.nodes[0].getmempoolinfo()['size'], self.mempool_size)
def run_test(self):
node = self.nodes[0]
self.log.info('Start with empty mempool, and 200 blocks')
self.mempool_size = 0
assert_equal(node.getblockcount(), 200)
assert_equal(node.getmempoolinfo()['size'], self.mempool_size)
coins = node.listunspent()
self.log.info('Should not accept garbage to testmempoolaccept')
assert_raises_rpc_error(-3, 'Expected type array, got string',
lambda: node.testmempoolaccept(rawtxs='ff00baar'))
assert_raises_rpc_error(-8, 'Array must contain between 1 and 50 transactions.',
lambda: node.testmempoolaccept(rawtxs=['ff22'] * 51))
assert_raises_rpc_error(-8, 'Array must contain between 1 and 50 transactions.',
lambda: node.testmempoolaccept(rawtxs=[]))
assert_raises_rpc_error(-22, 'TX decode failed',
lambda: node.testmempoolaccept(rawtxs=['ff00baar']))
self.log.info('A transaction already in the blockchain')
# Pick a random coin(base) to spend
coin = coins.pop()
raw_tx_in_block = node.signrawtransactionwithwallet(node.createrawtransaction(
inputs=[{'txid': coin['txid'], 'vout': coin['vout']}],
outputs=[{node.getnewaddress(): 300000},
{node.getnewaddress(): 49000000}],
))['hex']
txid_in_block = node.sendrawtransaction(
hexstring=raw_tx_in_block, maxfeerate=0)
self.generate(node, 1)
self.mempool_size = 0
self.check_mempool_result(
result_expected=[{'txid': txid_in_block, 'allowed': False,
'reject-reason': 'txn-already-known'}],
rawtxs=[raw_tx_in_block],
)
self.log.info('A transaction not in the mempool')
fee = Decimal("7.00")
raw_tx_0 = node.signrawtransactionwithwallet(node.createrawtransaction(
inputs=[{"txid": txid_in_block, "vout": 0,
"sequence": 0xfffffffd}],
outputs=[{node.getnewaddress(): Decimal(300_000) - fee}],
))['hex']
tx = FromHex(CTransaction(), raw_tx_0)
txid_0 = tx.rehash()
self.check_mempool_result(
result_expected=[{'txid': txid_0, 'allowed': True,
'size': tx.billable_size(),
'fees': {'base': fee}}],
rawtxs=[raw_tx_0],
)
self.log.info('A final transaction not in the mempool')
# Pick a random coin(base) to spend
coin = coins.pop()
output_amount = Decimal(25_000)
raw_tx_final = node.signrawtransactionwithwallet(node.createrawtransaction(
inputs=[{'txid': coin['txid'], 'vout': coin['vout'],
"sequence": 0xffffffff}], # SEQUENCE_FINAL
outputs=[{node.getnewaddress(): output_amount}],
locktime=node.getblockcount() + 2000, # Can be anything
))['hex']
tx = FromHex(CTransaction(), raw_tx_final)
fee_expected = coin['amount'] - output_amount
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(), 'allowed': True,
'size': tx.billable_size(),
'fees': {'base': fee_expected}}],
rawtxs=[tx.serialize().hex()],
maxfeerate=0,
)
node.sendrawtransaction(hexstring=raw_tx_final, maxfeerate=0)
self.mempool_size += 1
self.log.info('A transaction in the mempool')
node.sendrawtransaction(hexstring=raw_tx_0)
self.mempool_size += 1
self.check_mempool_result(
result_expected=[{'txid': txid_0, 'allowed': False,
'reject-reason': 'txn-already-in-mempool'}],
rawtxs=[raw_tx_0],
)
# Removed RBF test
# self.log.info('A transaction that replaces a mempool transaction')
# ...
self.log.info('A transaction that conflicts with an unconfirmed tx')
# Send the transaction that conflicts with the mempool transaction
node.sendrawtransaction(hexstring=tx.serialize().hex(), maxfeerate=0)
# take original raw_tx_0
tx = FromHex(CTransaction(), raw_tx_0)
tx.vout[0].nValue -= int(4 * fee * XEC) # Set more fee
# skip re-signing the tx
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(),
'allowed': False,
'reject-reason': 'txn-mempool-conflict'}],
rawtxs=[tx.serialize().hex()],
maxfeerate=0,
)
self.log.info('A transaction with missing inputs, that never existed')
tx = FromHex(CTransaction(), raw_tx_0)
- tx.vin[0].prevout = COutPoint(hash=int('ff' * 32, 16), n=14)
+ tx.vin[0].prevout = COutPoint(txid=int('ff' * 32, 16), n=14)
# skip re-signing the tx
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'missing-inputs'}],
rawtxs=[ToHex(tx)],
)
self.log.info(
'A transaction with missing inputs, that existed once in the past')
tx = FromHex(CTransaction(), raw_tx_0)
# Set vout to 1, to spend the other outpoint (49 coins) of the
# in-chain-tx we want to double spend
tx.vin[0].prevout.n = 1
raw_tx_1 = node.signrawtransactionwithwallet(
ToHex(tx))['hex']
txid_1 = node.sendrawtransaction(hexstring=raw_tx_1, maxfeerate=0)
# Now spend both to "clearly hide" the outputs, ie. remove the coins
# from the utxo set by spending them
raw_tx_spend_both = node.signrawtransactionwithwallet(node.createrawtransaction(
inputs=[
{'txid': txid_0, 'vout': 0},
{'txid': txid_1, 'vout': 0},
],
outputs=[{node.getnewaddress(): 100000}]
))['hex']
txid_spend_both = node.sendrawtransaction(
hexstring=raw_tx_spend_both, maxfeerate=0)
self.generate(node, 1)
self.mempool_size = 0
# Now see if we can add the coins back to the utxo set by sending the
# exact txs again
self.check_mempool_result(
result_expected=[
{'txid': txid_0, 'allowed': False, 'reject-reason': 'missing-inputs'}],
rawtxs=[raw_tx_0],
)
self.check_mempool_result(
result_expected=[
{'txid': txid_1, 'allowed': False, 'reject-reason': 'missing-inputs'}],
rawtxs=[raw_tx_1],
)
self.log.info('Create a signed "reference" tx for later use')
raw_tx_reference = node.signrawtransactionwithwallet(node.createrawtransaction(
inputs=[{'txid': txid_spend_both, 'vout': 0}],
outputs=[{node.getnewaddress(): 50000}],
))['hex']
tx = FromHex(CTransaction(), raw_tx_reference)
# Reference tx should be valid on itself
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(), 'allowed': True,
'size': tx.billable_size(),
'fees': {'base': Decimal(100_000 - 50_000)}}],
rawtxs=[ToHex(tx)],
maxfeerate=0,
)
self.log.info('A transaction with no outputs')
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vout = []
# Skip re-signing the transaction for context independent checks from now on
# FromHex(tx, node.signrawtransactionwithwallet(ToHex(tx))['hex'])
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(
), 'allowed': False, 'reject-reason': 'bad-txns-vout-empty'}],
rawtxs=[ToHex(tx)],
)
self.log.info('A really large transaction')
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vin = [tx.vin[0]] * (1 + MAX_BLOCK_BASE_SIZE
// len(tx.vin[0].serialize()))
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'bad-txns-oversize'}],
rawtxs=[ToHex(tx)],
)
self.log.info('A transaction with negative output value')
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vout[0].nValue *= -1
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(
), 'allowed': False, 'reject-reason': 'bad-txns-vout-negative'}],
rawtxs=[ToHex(tx)],
)
# The following two validations prevent overflow of the output amounts
# (see CVE-2010-5139).
self.log.info('A transaction with too large output value')
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vout[0].nValue = MAX_MONEY + 1
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(
), 'allowed': False, 'reject-reason': 'bad-txns-vout-toolarge'}],
rawtxs=[ToHex(tx)],
)
self.log.info('A transaction with too large sum of output values')
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vout = [tx.vout[0]] * 2
tx.vout[0].nValue = MAX_MONEY
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(
), 'allowed': False, 'reject-reason': 'bad-txns-txouttotal-toolarge'}],
rawtxs=[ToHex(tx)],
)
self.log.info('A transaction with duplicate inputs')
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vin = [tx.vin[0]] * 2
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(
), 'allowed': False, 'reject-reason': 'bad-txns-inputs-duplicate'}],
rawtxs=[ToHex(tx)],
)
self.log.info('A non-coinbase transaction with coinbase-like outpoint')
tx = FromHex(CTransaction(), raw_tx_reference)
- tx.vin.append(CTxIn(COutPoint(hash=0, n=0xffffffff)))
+ tx.vin.append(CTxIn(COutPoint(txid=0, n=0xffffffff)))
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(),
'allowed': False,
'reject-reason': 'bad-txns-prevout-null'}],
rawtxs=[tx.serialize().hex()],
)
self.log.info('A coinbase transaction')
# Pick the input of the first tx we signed, so it has to be a coinbase
# tx
raw_tx_coinbase_spent = node.getrawtransaction(
txid=node.decoderawtransaction(hexstring=raw_tx_in_block)['vin'][0]['txid'])
tx = FromHex(CTransaction(), raw_tx_coinbase_spent)
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'bad-tx-coinbase'}],
rawtxs=[ToHex(tx)],
)
self.log.info('Some nonstandard transactions')
tx = FromHex(CTransaction(), raw_tx_reference)
tx.nVersion = 3 # A version currently non-standard
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'version'}],
rawtxs=[ToHex(tx)],
)
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vout[0].scriptPubKey = CScript([OP_0]) # Some non-standard script
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'scriptpubkey'}],
rawtxs=[ToHex(tx)],
)
tx = FromHex(CTransaction(), raw_tx_reference)
key = ECKey()
key.generate()
pubkey = key.get_pubkey().get_bytes()
# Some bare multisig script (2-of-3)
tx.vout[0].scriptPubKey = CScript(
[OP_2, pubkey, pubkey, pubkey, OP_3, OP_CHECKMULTISIG])
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(), 'allowed': False,
'reject-reason': 'bare-multisig'}],
rawtxs=[tx.serialize().hex()],
)
tx = FromHex(CTransaction(), raw_tx_reference)
# Some not-pushonly scriptSig
tx.vin[0].scriptSig = CScript([OP_HASH160])
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(
), 'allowed': False, 'reject-reason': 'scriptsig-not-pushonly'}],
rawtxs=[ToHex(tx)],
)
tx = FromHex(CTransaction(), raw_tx_reference)
# Some too large scriptSig (>1650 bytes)
tx.vin[0].scriptSig = CScript([b'a' * 1648])
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(), 'allowed': False,
'reject-reason': 'scriptsig-size'}],
rawtxs=[tx.serialize().hex()],
)
tx = FromHex(CTransaction(), raw_tx_reference)
output_p2sh_burn = CTxOut(nValue=540, scriptPubKey=CScript(
[OP_HASH160, hash160(b'burn'), OP_EQUAL]))
# Use enough outputs to make the tx too large for our policy
num_scripts = 100000 // len(output_p2sh_burn.serialize())
tx.vout = [output_p2sh_burn] * num_scripts
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'tx-size'}],
rawtxs=[ToHex(tx)],
)
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vout[0] = output_p2sh_burn
# Make output smaller, such that it is dust for our policy
tx.vout[0].nValue -= 1
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'dust'}],
rawtxs=[ToHex(tx)],
)
tx = FromHex(CTransaction(), raw_tx_reference)
tx.vout[0].scriptPubKey = CScript([OP_RETURN, b'\xff'])
tx.vout = [tx.vout[0]] * 2
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'multi-op-return'}],
rawtxs=[ToHex(tx)],
)
self.log.info('A timelocked transaction')
tx = FromHex(CTransaction(), raw_tx_reference)
# Should be non-max, so locktime is not ignored
tx.vin[0].nSequence -= 1
tx.nLockTime = node.getblockcount() + 1
self.check_mempool_result(
result_expected=[
{'txid': tx.rehash(), 'allowed': False, 'reject-reason': 'bad-txns-nonfinal'}],
rawtxs=[ToHex(tx)],
)
self.log.info('A transaction that is locked by BIP68 sequence logic')
tx = FromHex(CTransaction(), raw_tx_reference)
# We could include it in the second block mined from now, but not the
# very next one
tx.vin[0].nSequence = 2
# Can skip re-signing the tx because of early rejection
self.check_mempool_result(
result_expected=[{'txid': tx.rehash(),
'allowed': False,
'reject-reason': 'non-BIP68-final'}],
rawtxs=[tx.serialize().hex()],
maxfeerate=0,
)
if __name__ == '__main__':
MempoolAcceptanceTest().main()
diff --git a/test/functional/mining_basic.py b/test/functional/mining_basic.py
index 0e1f9ee4b..8e67e2378 100755
--- a/test/functional/mining_basic.py
+++ b/test/functional/mining_basic.py
@@ -1,285 +1,285 @@
#!/usr/bin/env python3
# Copyright (c) 2014-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test mining RPCs
- getmininginfo
- getblocktemplate proposal mode
- submitblock"""
import copy
from decimal import Decimal
from test_framework.blocktools import TIME_GENESIS_BLOCK, create_coinbase
from test_framework.messages import BLOCK_HEADER_SIZE, CBlock, CBlockHeader
from test_framework.p2p import P2PDataStore
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import assert_equal, assert_raises_rpc_error
VERSIONBITS_TOP_BITS = 0x20000000
def assert_template(node, block, expect, rehash=True):
if rehash:
block.hashMerkleRoot = block.calc_merkle_root()
rsp = node.getblocktemplate(
template_request={
'data': block.serialize().hex(),
'mode': 'proposal'})
assert_equal(rsp, expect)
class MiningTest(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
self.supports_cli = False
def mine_chain(self):
self.log.info('Create some old blocks')
node = self.nodes[0]
address = node.get_deterministic_priv_key().address
for t in range(TIME_GENESIS_BLOCK,
TIME_GENESIS_BLOCK + 200 * 600, 600):
node.setmocktime(t)
self.generatetoaddress(node, 1, address, sync_fun=self.no_op)
mining_info = node.getmininginfo()
assert_equal(mining_info['blocks'], 200)
assert_equal(mining_info['currentblocktx'], 0)
assert_equal(mining_info['currentblocksize'], 1000)
self.log.info('test blockversion')
self.restart_node(
0, extra_args=[f'-mocktime={t}', '-blockversion=1337'])
self.connect_nodes(0, 1)
assert_equal(1337, self.nodes[0].getblocktemplate()['version'])
self.restart_node(0, extra_args=[f'-mocktime={t}'])
self.connect_nodes(0, 1)
assert_equal(
VERSIONBITS_TOP_BITS,
self.nodes[0].getblocktemplate()['version'])
self.restart_node(0)
self.connect_nodes(0, 1)
def run_test(self):
self.mine_chain()
node = self.nodes[0]
def assert_submitblock(block, result_str_1, result_str_2=None):
block.solve()
result_str_2 = result_str_2 or 'duplicate-invalid'
assert_equal(result_str_1, node.submitblock(
hexdata=block.serialize().hex()))
assert_equal(result_str_2, node.submitblock(
hexdata=block.serialize().hex()))
self.log.info('getmininginfo')
mining_info = node.getmininginfo()
assert_equal(mining_info['blocks'], 200)
assert_equal(mining_info['chain'], self.chain)
assert 'currentblocktx' not in mining_info
assert 'currentblocksize' not in mining_info
assert_equal(mining_info['difficulty'],
Decimal('4.656542373906925E-10'))
assert_equal(mining_info['networkhashps'],
Decimal('0.003333333333333334'))
assert_equal(mining_info['pooledtx'], 0)
# Mine a block to leave initial block download
self.generatetoaddress(
node, 1, node.get_deterministic_priv_key().address)
tmpl = node.getblocktemplate()
self.log.info("getblocktemplate: Test capability advertised")
assert 'proposal' in tmpl['capabilities']
next_height = int(tmpl["height"])
coinbase_tx = create_coinbase(height=next_height)
# sequence numbers must not be max for nLockTime to have effect
coinbase_tx.vin[0].nSequence = 2 ** 32 - 2
coinbase_tx.rehash()
block = CBlock()
block.nVersion = tmpl["version"]
block.hashPrevBlock = int(tmpl["previousblockhash"], 16)
block.nTime = tmpl["curtime"]
block.nBits = int(tmpl["bits"], 16)
block.nNonce = 0
block.vtx = [coinbase_tx]
self.log.info("getblocktemplate: Test valid block")
assert_template(node, block, None)
self.log.info("submitblock: Test block decode failure")
assert_raises_rpc_error(-22, "Block decode failed",
node.submitblock, block.serialize()[:-15].hex())
self.log.info(
"getblocktemplate: Test bad input hash for coinbase transaction")
bad_block = copy.deepcopy(block)
- bad_block.vtx[0].vin[0].prevout.hash += 1
+ bad_block.vtx[0].vin[0].prevout.txid += 1
bad_block.vtx[0].rehash()
assert_template(node, bad_block, 'bad-cb-missing')
self.log.info("submitblock: Test invalid coinbase transaction")
assert_raises_rpc_error(-22, "Block does not start with a coinbase",
node.submitblock, bad_block.serialize().hex())
self.log.info("getblocktemplate: Test truncated final transaction")
assert_raises_rpc_error(-22, "Block decode failed", node.getblocktemplate, {
'data': block.serialize()[:-1].hex(), 'mode': 'proposal'})
self.log.info("getblocktemplate: Test duplicate transaction")
bad_block = copy.deepcopy(block)
bad_block.vtx.append(bad_block.vtx[0])
assert_template(node, bad_block, 'bad-txns-duplicate')
assert_submitblock(bad_block, 'bad-txns-duplicate',
'bad-txns-duplicate')
self.log.info("getblocktemplate: Test invalid transaction")
bad_block = copy.deepcopy(block)
bad_tx = copy.deepcopy(bad_block.vtx[0])
- bad_tx.vin[0].prevout.hash = 255
+ bad_tx.vin[0].prevout.txid = 255
bad_tx.rehash()
bad_block.vtx.append(bad_tx)
assert_template(node, bad_block, 'bad-txns-inputs-missingorspent')
assert_submitblock(bad_block, 'bad-txns-inputs-missingorspent')
self.log.info("getblocktemplate: Test nonfinal transaction")
bad_block = copy.deepcopy(block)
bad_block.vtx[0].nLockTime = 2 ** 32 - 1
bad_block.vtx[0].rehash()
assert_template(node, bad_block, 'bad-txns-nonfinal')
assert_submitblock(bad_block, 'bad-txns-nonfinal')
self.log.info("getblocktemplate: Test bad tx count")
# The tx count is immediately after the block header
bad_block_sn = bytearray(block.serialize())
assert_equal(bad_block_sn[BLOCK_HEADER_SIZE], 1)
bad_block_sn[BLOCK_HEADER_SIZE] += 1
assert_raises_rpc_error(-22, "Block decode failed", node.getblocktemplate, {
'data': bad_block_sn.hex(), 'mode': 'proposal'})
self.log.info("getblocktemplate: Test bad bits")
bad_block = copy.deepcopy(block)
bad_block.nBits = 469762303 # impossible in the real world
assert_template(node, bad_block, 'bad-diffbits')
self.log.info("getblocktemplate: Test bad merkle root")
bad_block = copy.deepcopy(block)
bad_block.hashMerkleRoot += 1
assert_template(node, bad_block, 'bad-txnmrklroot', False)
assert_submitblock(bad_block, 'bad-txnmrklroot', 'bad-txnmrklroot')
self.log.info("getblocktemplate: Test bad timestamps")
bad_block = copy.deepcopy(block)
bad_block.nTime = 2 ** 31 - 1
assert_template(node, bad_block, 'time-too-new')
assert_submitblock(bad_block, 'time-too-new', 'time-too-new')
bad_block.nTime = 0
assert_template(node, bad_block, 'time-too-old')
assert_submitblock(bad_block, 'time-too-old', 'time-too-old')
self.log.info("getblocktemplate: Test not best block")
bad_block = copy.deepcopy(block)
bad_block.hashPrevBlock = 123
assert_template(node, bad_block, 'inconclusive-not-best-prevblk')
assert_submitblock(bad_block, 'prev-blk-not-found',
'prev-blk-not-found')
self.log.info('submitheader tests')
assert_raises_rpc_error(-22, 'Block header decode failed',
lambda: node.submitheader(hexdata='xx' * BLOCK_HEADER_SIZE))
assert_raises_rpc_error(-22, 'Block header decode failed',
lambda: node.submitheader(hexdata='ff' * (BLOCK_HEADER_SIZE - 2)))
assert_raises_rpc_error(-25, 'Must submit previous header',
lambda: node.submitheader(hexdata=super(CBlock, bad_block).serialize().hex()))
block.nTime += 1
block.solve()
def chain_tip(b_hash, *, status='headers-only', branchlen=1):
return {'hash': b_hash, 'height': 202,
'branchlen': branchlen, 'status': status}
assert chain_tip(block.hash) not in node.getchaintips()
node.submitheader(hexdata=block.serialize().hex())
assert chain_tip(block.hash) in node.getchaintips()
# Noop
node.submitheader(hexdata=CBlockHeader(block).serialize().hex())
assert chain_tip(block.hash) in node.getchaintips()
bad_block_root = copy.deepcopy(block)
bad_block_root.hashMerkleRoot += 2
bad_block_root.solve()
assert chain_tip(bad_block_root.hash) not in node.getchaintips()
node.submitheader(hexdata=CBlockHeader(
bad_block_root).serialize().hex())
assert chain_tip(bad_block_root.hash) in node.getchaintips()
# Should still reject invalid blocks, even if we have the header:
assert_equal(node.submitblock(
hexdata=bad_block_root.serialize().hex()), 'bad-txnmrklroot')
assert_equal(node.submitblock(
hexdata=bad_block_root.serialize().hex()), 'bad-txnmrklroot')
assert chain_tip(bad_block_root.hash) in node.getchaintips()
# We know the header for this invalid block, so should just return
# early without error:
node.submitheader(hexdata=CBlockHeader(
bad_block_root).serialize().hex())
assert chain_tip(bad_block_root.hash) in node.getchaintips()
bad_block_lock = copy.deepcopy(block)
bad_block_lock.vtx[0].nLockTime = 2**32 - 1
bad_block_lock.vtx[0].rehash()
bad_block_lock.hashMerkleRoot = bad_block_lock.calc_merkle_root()
bad_block_lock.solve()
assert_equal(node.submitblock(
hexdata=bad_block_lock.serialize().hex()), 'bad-txns-nonfinal')
assert_equal(node.submitblock(
hexdata=bad_block_lock.serialize().hex()), 'duplicate-invalid')
# Build a "good" block on top of the submitted bad block
bad_block2 = copy.deepcopy(block)
bad_block2.hashPrevBlock = bad_block_lock.sha256
bad_block2.solve()
assert_raises_rpc_error(-25, 'bad-prevblk', lambda: node.submitheader(
hexdata=CBlockHeader(bad_block2).serialize().hex()))
# Should reject invalid header right away
bad_block_time = copy.deepcopy(block)
bad_block_time.nTime = 1
bad_block_time.solve()
assert_raises_rpc_error(-25, 'time-too-old', lambda: node.submitheader(
hexdata=CBlockHeader(bad_block_time).serialize().hex()))
# Should ask for the block from a p2p node, if they announce the header
# as well:
peer = node.add_p2p_connection(P2PDataStore())
# Drop the first getheaders
peer.wait_for_getheaders(timeout=5)
peer.send_blocks_and_test(blocks=[block], node=node)
# Must be active now:
assert chain_tip(block.hash, status='active',
branchlen=0) in node.getchaintips()
# Building a few blocks should give the same results
self.generatetoaddress(
node, 10, node.get_deterministic_priv_key().address)
assert_raises_rpc_error(-25, 'time-too-old', lambda: node.submitheader(
hexdata=CBlockHeader(bad_block_time).serialize().hex()))
assert_raises_rpc_error(-25, 'bad-prevblk', lambda: node.submitheader(
hexdata=CBlockHeader(bad_block2).serialize().hex()))
node.submitheader(hexdata=CBlockHeader(block).serialize().hex())
node.submitheader(hexdata=CBlockHeader(
bad_block_root).serialize().hex())
# valid
assert_equal(node.submitblock(
hexdata=block.serialize().hex()), 'duplicate')
# Sanity check that maxtries supports large integers
self.generatetoaddress(node,
1, node.get_deterministic_priv_key().address, pow(
2, 32))
if __name__ == '__main__':
MiningTest().main()
diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py
index 7adbbd139..25fe1dcb3 100755
--- a/test/functional/test_framework/messages.py
+++ b/test/functional/test_framework/messages.py
@@ -1,2307 +1,2307 @@
#!/usr/bin/env python3
# Copyright (c) 2010 ArtForz -- public domain half-a-node
# Copyright (c) 2012 Jeff Garzik
# Copyright (c) 2010-2019 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Bitcoin test framework primitive and message structures
CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....:
data structures that should map to corresponding structures in
bitcoin/primitives
msg_block, msg_tx, msg_headers, etc.:
data structures that represent network messages
ser_*, deser_*: functions that handle serialization/deserialization.
Classes use __slots__ to ensure extraneous attributes aren't accidentally added
by tests, compromising their intended effect.
"""
import copy
import hashlib
import random
import socket
import struct
import time
import unittest
from base64 import b64decode, b64encode
from enum import IntEnum
from io import BytesIO
from typing import List
from test_framework.siphash import siphash256
from test_framework.util import assert_equal, uint256_hex
MAX_LOCATOR_SZ = 101
MAX_BLOCK_BASE_SIZE = 1000000
MAX_BLOOM_FILTER_SIZE = 36000
MAX_BLOOM_HASH_FUNCS = 50
# 1,000,000 XEC in satoshis (legacy BCHA)
COIN = 100000000
# 1 XEC in satoshis
XEC = 100
MAX_MONEY = 21000000 * COIN
# Maximum length of incoming protocol messages
MAX_PROTOCOL_MESSAGE_LENGTH = 2 * 1024 * 1024
MAX_HEADERS_RESULTS = 2000 # Number of headers sent in one getheaders result
MAX_INV_SIZE = 50000 # Maximum number of entries in an 'inv' protocol message
NODE_NETWORK = (1 << 0)
NODE_GETUTXO = (1 << 1)
NODE_BLOOM = (1 << 2)
# NODE_WITNESS = (1 << 3)
# NODE_XTHIN = (1 << 4) # removed in v0.22.12
NODE_COMPACT_FILTERS = (1 << 6)
NODE_NETWORK_LIMITED = (1 << 10)
NODE_AVALANCHE = (1 << 24)
MSG_TX = 1
MSG_BLOCK = 2
MSG_FILTERED_BLOCK = 3
MSG_CMPCT_BLOCK = 4
MSG_AVA_PROOF = 0x1f000001
MSG_TYPE_MASK = 0xffffffff >> 2
FILTER_TYPE_BASIC = 0
# Serialization/deserialization tools
def sha256(s):
return hashlib.new('sha256', s).digest()
def hash256(s):
return sha256(sha256(s))
def ser_compact_size(size):
r = b""
if size < 253:
r = struct.pack("B", size)
elif size < 0x10000:
r = struct.pack("<BH", 253, size)
elif size < 0x100000000:
r = struct.pack("<BI", 254, size)
else:
r = struct.pack("<BQ", 255, size)
return r
def deser_compact_size(f):
nit = struct.unpack("<B", f.read(1))[0]
if nit == 253:
nit = struct.unpack("<H", f.read(2))[0]
elif nit == 254:
nit = struct.unpack("<I", f.read(4))[0]
elif nit == 255:
nit = struct.unpack("<Q", f.read(8))[0]
return nit
def deser_string(f):
nit = deser_compact_size(f)
return f.read(nit)
def ser_string(s):
return ser_compact_size(len(s)) + s
def deser_uint256(f):
r = 0
for i in range(8):
t = struct.unpack("<I", f.read(4))[0]
r += t << (i * 32)
return r
def ser_uint256(u):
rs = b""
for _ in range(8):
rs += struct.pack("<I", u & 0xFFFFFFFF)
u >>= 32
return rs
def uint256_from_str(s):
r = 0
t = struct.unpack("<IIIIIIII", s[:32])
for i in range(8):
r += t[i] << (i * 32)
return r
def uint256_from_compact(c):
nbytes = (c >> 24) & 0xFF
v = (c & 0xFFFFFF) << (8 * (nbytes - 3))
return v
# deser_function_name: Allow for an alternate deserialization function on the
# entries in the vector.
def deser_vector(f, c, deser_function_name=None):
nit = deser_compact_size(f)
r = []
for _ in range(nit):
t = c()
if deser_function_name:
getattr(t, deser_function_name)(f)
else:
t.deserialize(f)
r.append(t)
return r
# ser_function_name: Allow for an alternate serialization function on the
# entries in the vector.
def ser_vector(v, ser_function_name=None):
r = ser_compact_size(len(v))
for i in v:
if ser_function_name:
r += getattr(i, ser_function_name)()
else:
r += i.serialize()
return r
def deser_uint256_vector(f):
nit = deser_compact_size(f)
r = []
for _ in range(nit):
t = deser_uint256(f)
r.append(t)
return r
def ser_uint256_vector(v):
r = ser_compact_size(len(v))
for i in v:
r += ser_uint256(i)
return r
def deser_string_vector(f):
nit = deser_compact_size(f)
r = []
for _ in range(nit):
t = deser_string(f)
r.append(t)
return r
def ser_string_vector(v):
r = ser_compact_size(len(v))
for sv in v:
r += ser_string(sv)
return r
def FromHex(obj, hex_string):
"""Deserialize from a hex string representation (eg from RPC)"""
obj.deserialize(BytesIO(bytes.fromhex(hex_string)))
return obj
def ToHex(obj):
"""Convert a binary-serializable object to hex
(eg for submission via RPC)"""
return obj.serialize().hex()
# Objects that map to bitcoind objects, which can be serialized/deserialized
class CAddress:
__slots__ = ("net", "ip", "nServices", "port", "time")
# see https://github.com/bitcoin/bips/blob/master/bip-0155.mediawiki
NET_IPV4 = 1
ADDRV2_NET_NAME = {
NET_IPV4: "IPv4"
}
ADDRV2_ADDRESS_LENGTH = {
NET_IPV4: 4
}
def __init__(self):
self.time = 0
self.nServices = 1
self.net = self.NET_IPV4
self.ip = "0.0.0.0"
self.port = 0
def deserialize(self, f, *, with_time=True):
"""Deserialize from addrv1 format (pre-BIP155)"""
if with_time:
# VERSION messages serialize CAddress objects without time
self.time = struct.unpack("<I", f.read(4))[0]
self.nServices = struct.unpack("<Q", f.read(8))[0]
# We only support IPv4 which means skip 12 bytes and read the next 4 as
# IPv4 address.
f.read(12)
self.net = self.NET_IPV4
self.ip = socket.inet_ntoa(f.read(4))
self.port = struct.unpack(">H", f.read(2))[0]
def serialize(self, *, with_time=True):
"""Serialize in addrv1 format (pre-BIP155)"""
assert self.net == self.NET_IPV4
r = b""
if with_time:
# VERSION messages serialize CAddress objects without time
r += struct.pack("<I", self.time)
r += struct.pack("<Q", self.nServices)
r += b"\x00" * 10 + b"\xff" * 2
r += socket.inet_aton(self.ip)
r += struct.pack(">H", self.port)
return r
def deserialize_v2(self, f):
"""Deserialize from addrv2 format (BIP155)"""
self.time = struct.unpack("<I", f.read(4))[0]
self.nServices = deser_compact_size(f)
self.net = struct.unpack("B", f.read(1))[0]
assert self.net == self.NET_IPV4
address_length = deser_compact_size(f)
assert address_length == self.ADDRV2_ADDRESS_LENGTH[self.net]
self.ip = socket.inet_ntoa(f.read(4))
self.port = struct.unpack(">H", f.read(2))[0]
def serialize_v2(self):
"""Serialize in addrv2 format (BIP155)"""
assert self.net == self.NET_IPV4
r = b""
r += struct.pack("<I", self.time)
r += ser_compact_size(self.nServices)
r += struct.pack("B", self.net)
r += ser_compact_size(self.ADDRV2_ADDRESS_LENGTH[self.net])
r += socket.inet_aton(self.ip)
r += struct.pack(">H", self.port)
return r
def __repr__(self):
return (
f"CAddress(nServices={self.nServices} net={self.ADDRV2_NET_NAME[self.net]} "
f"addr={self.ip} port={self.port})"
)
class CInv:
__slots__ = ("hash", "type")
typemap = {
0: "Error",
MSG_TX: "TX",
MSG_BLOCK: "Block",
MSG_FILTERED_BLOCK: "filtered Block",
MSG_CMPCT_BLOCK: "CompactBlock",
MSG_AVA_PROOF: "avalanche proof",
}
def __init__(self, t=0, h=0):
self.type = t
self.hash = h
def deserialize(self, f):
self.type = struct.unpack("<i", f.read(4))[0]
self.hash = deser_uint256(f)
def serialize(self):
r = b""
r += struct.pack("<i", self.type)
r += ser_uint256(self.hash)
return r
def __repr__(self):
return f"CInv(type={self.typemap[self.type]} hash={uint256_hex(self.hash)})"
def __eq__(self, other):
return isinstance(
other, CInv) and self.hash == other.hash and self.type == other.type
class CBlockLocator:
__slots__ = ("nVersion", "vHave")
def __init__(self):
self.vHave = []
def deserialize(self, f):
# Ignore version field.
struct.unpack("<i", f.read(4))[0]
self.vHave = deser_uint256_vector(f)
def serialize(self):
r = b""
# Bitcoin ABC ignores version field. Set it to 0.
r += struct.pack("<i", 0)
r += ser_uint256_vector(self.vHave)
return r
def __repr__(self):
return f"CBlockLocator(vHave={self.vHave!r})"
class COutPoint:
- __slots__ = ("hash", "n")
+ __slots__ = ("txid", "n")
- def __init__(self, hash=0, n=0):
- self.hash = hash
+ def __init__(self, txid=0, n=0):
+ self.txid = txid
self.n = n
def deserialize(self, f):
- self.hash = deser_uint256(f)
+ self.txid = deser_uint256(f)
self.n = struct.unpack("<I", f.read(4))[0]
def serialize(self):
r = b""
- r += ser_uint256(self.hash)
+ r += ser_uint256(self.txid)
r += struct.pack("<I", self.n)
return r
def __repr__(self):
- return f"COutPoint(hash={uint256_hex(self.hash)} n={self.n})"
+ return f"COutPoint(txid={uint256_hex(self.txid)} n={self.n})"
class CTxIn:
__slots__ = ("nSequence", "prevout", "scriptSig")
def __init__(self, outpoint=None, scriptSig=b"", nSequence=0):
if outpoint is None:
self.prevout = COutPoint()
else:
self.prevout = outpoint
self.scriptSig = scriptSig
self.nSequence = nSequence
def deserialize(self, f):
self.prevout = COutPoint()
self.prevout.deserialize(f)
self.scriptSig = deser_string(f)
self.nSequence = struct.unpack("<I", f.read(4))[0]
def serialize(self):
r = b""
r += self.prevout.serialize()
r += ser_string(self.scriptSig)
r += struct.pack("<I", self.nSequence)
return r
def __repr__(self):
return (
f"CTxIn(prevout={self.prevout!r} scriptSig={self.scriptSig.hex()} "
f"nSequence={self.nSequence})"
)
class CTxOut:
__slots__ = ("nValue", "scriptPubKey")
def __init__(self, nValue=0, scriptPubKey=b""):
self.nValue = nValue
self.scriptPubKey = scriptPubKey
def deserialize(self, f):
self.nValue = struct.unpack("<q", f.read(8))[0]
self.scriptPubKey = deser_string(f)
def serialize(self):
r = b""
r += struct.pack("<q", self.nValue)
r += ser_string(self.scriptPubKey)
return r
def __repr__(self):
return (
f"CTxOut(nValue={self.nValue // XEC}.{self.nValue % XEC:02d} "
f"scriptPubKey={self.scriptPubKey.hex()})"
)
class CTransaction:
__slots__ = ("hash", "nLockTime", "nVersion", "sha256", "vin", "vout")
def __init__(self, tx=None):
if tx is None:
self.nVersion = 1
self.vin = []
self.vout = []
self.nLockTime = 0
self.sha256 = None
self.hash = None
else:
self.nVersion = tx.nVersion
self.vin = copy.deepcopy(tx.vin)
self.vout = copy.deepcopy(tx.vout)
self.nLockTime = tx.nLockTime
self.sha256 = tx.sha256
self.hash = tx.hash
def deserialize(self, f):
self.nVersion = struct.unpack("<i", f.read(4))[0]
self.vin = deser_vector(f, CTxIn)
self.vout = deser_vector(f, CTxOut)
self.nLockTime = struct.unpack("<I", f.read(4))[0]
self.sha256 = None
self.hash = None
def billable_size(self):
"""
Returns the size used for billing the against the transaction
"""
return len(self.serialize())
def serialize(self):
r = b""
r += struct.pack("<i", self.nVersion)
r += ser_vector(self.vin)
r += ser_vector(self.vout)
r += struct.pack("<I", self.nLockTime)
return r
# Recalculate the txid
def rehash(self):
self.sha256 = None
self.calc_sha256()
return self.hash
# self.sha256 and self.hash -- those are expected to be the txid.
def calc_sha256(self):
if self.sha256 is None:
self.sha256 = uint256_from_str(hash256(self.serialize()))
self.hash = hash256(self.serialize())[::-1].hex()
def get_id(self):
# For now, just forward the hash.
self.calc_sha256()
return self.hash
def is_valid(self):
self.calc_sha256()
for tout in self.vout:
if tout.nValue < 0 or tout.nValue > MAX_MONEY:
return False
return True
def __repr__(self):
return (
f"CTransaction(nVersion={self.nVersion} vin={self.vin!r} "
f"vout={self.vout!r} nLockTime={self.nLockTime})"
)
class CBlockHeader:
__slots__ = ("hash", "hashMerkleRoot", "hashPrevBlock", "nBits", "nNonce",
"nTime", "nVersion", "sha256")
def __init__(self, header=None):
if header is None:
self.set_null()
else:
self.nVersion = header.nVersion
self.hashPrevBlock = header.hashPrevBlock
self.hashMerkleRoot = header.hashMerkleRoot
self.nTime = header.nTime
self.nBits = header.nBits
self.nNonce = header.nNonce
self.sha256 = header.sha256
self.hash = header.hash
self.calc_sha256()
def set_null(self):
self.nVersion = 1
self.hashPrevBlock = 0
self.hashMerkleRoot = 0
self.nTime = 0
self.nBits = 0
self.nNonce = 0
self.sha256 = None
self.hash = None
def deserialize(self, f):
self.nVersion = struct.unpack("<i", f.read(4))[0]
self.hashPrevBlock = deser_uint256(f)
self.hashMerkleRoot = deser_uint256(f)
self.nTime = struct.unpack("<I", f.read(4))[0]
self.nBits = struct.unpack("<I", f.read(4))[0]
self.nNonce = struct.unpack("<I", f.read(4))[0]
self.sha256 = None
self.hash = None
def serialize(self):
r = b""
r += struct.pack("<i", self.nVersion)
r += ser_uint256(self.hashPrevBlock)
r += ser_uint256(self.hashMerkleRoot)
r += struct.pack("<I", self.nTime)
r += struct.pack("<I", self.nBits)
r += struct.pack("<I", self.nNonce)
return r
def calc_sha256(self):
if self.sha256 is None:
r = b""
r += struct.pack("<i", self.nVersion)
r += ser_uint256(self.hashPrevBlock)
r += ser_uint256(self.hashMerkleRoot)
r += struct.pack("<I", self.nTime)
r += struct.pack("<I", self.nBits)
r += struct.pack("<I", self.nNonce)
self.sha256 = uint256_from_str(hash256(r))
self.hash = hash256(r)[::-1].hex()
def rehash(self):
self.sha256 = None
self.calc_sha256()
return self.sha256
def __repr__(self):
return (
f"CBlockHeader(nVersion={self.nVersion} "
f"hashPrevBlock={uint256_hex(self.hashPrevBlock)} "
f"hashMerkleRoot={uint256_hex(self.hashMerkleRoot)} nTime={self.nTime} "
f"nBits={self.nBits:08x} nNonce={self.nNonce:08x})"
)
BLOCK_HEADER_SIZE = len(CBlockHeader().serialize())
assert_equal(BLOCK_HEADER_SIZE, 80)
class CBlock(CBlockHeader):
__slots__ = ("vtx",)
def __init__(self, header=None):
super().__init__(header)
self.vtx: List[CTransaction] = []
def deserialize(self, f):
super().deserialize(f)
self.vtx = deser_vector(f, CTransaction)
def serialize(self):
r = b""
r += super().serialize()
r += ser_vector(self.vtx)
return r
# Calculate the merkle root given a vector of transaction hashes
def get_merkle_root(self, hashes):
while len(hashes) > 1:
newhashes = []
for i in range(0, len(hashes), 2):
i2 = min(i + 1, len(hashes) - 1)
newhashes.append(hash256(hashes[i] + hashes[i2]))
hashes = newhashes
return uint256_from_str(hashes[0])
def calc_merkle_root(self):
hashes = []
for tx in self.vtx:
tx.calc_sha256()
hashes.append(ser_uint256(tx.sha256))
return self.get_merkle_root(hashes)
def is_valid(self):
self.calc_sha256()
target = uint256_from_compact(self.nBits)
if self.sha256 > target:
return False
for tx in self.vtx:
if not tx.is_valid():
return False
if self.calc_merkle_root() != self.hashMerkleRoot:
return False
return True
def solve(self):
self.rehash()
target = uint256_from_compact(self.nBits)
while self.sha256 > target:
self.nNonce += 1
self.rehash()
def __repr__(self):
return (
f"CBlock(nVersion={self.nVersion} "
f"hashPrevBlock={uint256_hex(self.hashPrevBlock)} "
f"hashMerkleRoot={uint256_hex(self.hashMerkleRoot)} "
f"nTime={self.nTime} nBits={self.nBits:08x} "
f"nNonce={self.nNonce:08x} vtx={self.vtx!r})"
)
class PrefilledTransaction:
__slots__ = ("index", "tx")
def __init__(self, index=0, tx=None):
self.index = index
self.tx = tx
def deserialize(self, f):
self.index = deser_compact_size(f)
self.tx = CTransaction()
self.tx.deserialize(f)
def serialize(self):
r = b""
r += ser_compact_size(self.index)
r += self.tx.serialize()
return r
def __repr__(self):
return f"PrefilledTransaction(index={self.index}, tx={self.tx!r})"
# This is what we send on the wire, in a cmpctblock message.
class P2PHeaderAndShortIDs:
__slots__ = ("header", "nonce", "prefilled_txn", "prefilled_txn_length",
"shortids", "shortids_length")
def __init__(self):
self.header = CBlockHeader()
self.nonce = 0
self.shortids_length = 0
self.shortids = []
self.prefilled_txn_length = 0
self.prefilled_txn = []
def deserialize(self, f):
self.header.deserialize(f)
self.nonce = struct.unpack("<Q", f.read(8))[0]
self.shortids_length = deser_compact_size(f)
for _ in range(self.shortids_length):
# shortids are defined to be 6 bytes in the spec, so append
# two zero bytes and read it in as an 8-byte number
self.shortids.append(
struct.unpack("<Q", f.read(6) + b'\x00\x00')[0])
self.prefilled_txn = deser_vector(f, PrefilledTransaction)
self.prefilled_txn_length = len(self.prefilled_txn)
def serialize(self):
r = b""
r += self.header.serialize()
r += struct.pack("<Q", self.nonce)
r += ser_compact_size(self.shortids_length)
for x in self.shortids:
# We only want the first 6 bytes
r += struct.pack("<Q", x)[0:6]
r += ser_vector(self.prefilled_txn)
return r
def __repr__(self):
return (
f"P2PHeaderAndShortIDs(header={self.header!r}, nonce={self.nonce}, "
f"shortids_length={self.shortids_length}, shortids={self.shortids!r}, "
f"prefilled_txn_length={self.prefilled_txn_length}, "
f"prefilledtxn={self.prefilled_txn!r}"
)
def calculate_shortid(k0, k1, tx_hash):
"""Calculate the BIP 152-compact blocks shortid for a given
transaction hash"""
expected_shortid = siphash256(k0, k1, tx_hash)
expected_shortid &= 0x0000ffffffffffff
return expected_shortid
# This version gets rid of the array lengths, and reinterprets the differential
# encoding into indices that can be used for lookup.
class HeaderAndShortIDs:
__slots__ = ("header", "nonce", "prefilled_txn", "shortids")
def __init__(self, p2pheaders_and_shortids=None):
self.header = CBlockHeader()
self.nonce = 0
self.shortids = []
self.prefilled_txn = []
if p2pheaders_and_shortids is not None:
self.header = p2pheaders_and_shortids.header
self.nonce = p2pheaders_and_shortids.nonce
self.shortids = p2pheaders_and_shortids.shortids
last_index = -1
for x in p2pheaders_and_shortids.prefilled_txn:
self.prefilled_txn.append(
PrefilledTransaction(x.index + last_index + 1, x.tx))
last_index = self.prefilled_txn[-1].index
def to_p2p(self):
ret = P2PHeaderAndShortIDs()
ret.header = self.header
ret.nonce = self.nonce
ret.shortids_length = len(self.shortids)
ret.shortids = self.shortids
ret.prefilled_txn_length = len(self.prefilled_txn)
ret.prefilled_txn = []
last_index = -1
for x in self.prefilled_txn:
ret.prefilled_txn.append(
PrefilledTransaction(x.index - last_index - 1, x.tx))
last_index = x.index
return ret
def get_siphash_keys(self):
header_nonce = self.header.serialize()
header_nonce += struct.pack("<Q", self.nonce)
hash_header_nonce_as_str = sha256(header_nonce)
key0 = struct.unpack("<Q", hash_header_nonce_as_str[0:8])[0]
key1 = struct.unpack("<Q", hash_header_nonce_as_str[8:16])[0]
return [key0, key1]
# Version 2 compact blocks use wtxid in shortids (rather than txid)
def initialize_from_block(self, block, nonce=0, prefill_list=None):
if prefill_list is None:
prefill_list = [0]
self.header = CBlockHeader(block)
self.nonce = nonce
self.prefilled_txn = [PrefilledTransaction(i, block.vtx[i])
for i in prefill_list]
self.shortids = []
[k0, k1] = self.get_siphash_keys()
for i in range(len(block.vtx)):
if i not in prefill_list:
tx_hash = block.vtx[i].sha256
self.shortids.append(calculate_shortid(k0, k1, tx_hash))
def __repr__(self):
return (
f"HeaderAndShortIDs(header={self.header!r}, nonce={self.nonce}, "
f"shortids={self.shortids!r}, prefilledtxn={self.prefilled_txn!r}"
)
class BlockTransactionsRequest:
__slots__ = ("blockhash", "indexes")
def __init__(self, blockhash=0, indexes=None):
self.blockhash = blockhash
self.indexes = indexes if indexes is not None else []
def deserialize(self, f):
self.blockhash = deser_uint256(f)
indexes_length = deser_compact_size(f)
for _ in range(indexes_length):
self.indexes.append(deser_compact_size(f))
def serialize(self):
r = b""
r += ser_uint256(self.blockhash)
r += ser_compact_size(len(self.indexes))
for x in self.indexes:
r += ser_compact_size(x)
return r
# helper to set the differentially encoded indexes from absolute ones
def from_absolute(self, absolute_indexes):
self.indexes = []
last_index = -1
for x in absolute_indexes:
self.indexes.append(x - last_index - 1)
last_index = x
def to_absolute(self):
absolute_indexes = []
last_index = -1
for x in self.indexes:
absolute_indexes.append(x + last_index + 1)
last_index = absolute_indexes[-1]
return absolute_indexes
def __repr__(self):
return (
f"BlockTransactionsRequest(hash={uint256_hex(self.blockhash)} "
f"indexes={self.indexes!r})"
)
class BlockTransactions:
__slots__ = ("blockhash", "transactions")
def __init__(self, blockhash=0, transactions=None):
self.blockhash = blockhash
self.transactions = transactions if transactions is not None else []
def deserialize(self, f):
self.blockhash = deser_uint256(f)
self.transactions = deser_vector(f, CTransaction)
def serialize(self):
r = b""
r += ser_uint256(self.blockhash)
r += ser_vector(self.transactions)
return r
def __repr__(self):
return (
f"BlockTransactions(hash={uint256_hex(self.blockhash)} "
f"transactions={self.transactions!r})"
)
class AvalancheStake:
__slots__ = ("utxo", "amount", "height", "pubkey", "is_coinbase")
def __init__(self, utxo=None, amount=0, height=0,
pubkey=b"", is_coinbase=False):
self.utxo: COutPoint = utxo or COutPoint()
self.amount: int = amount
"""Amount in satoshis (int64)"""
self.height: int = height
"""Block height containing this utxo (uint32)"""
self.pubkey: bytes = pubkey
"""Public key"""
self.is_coinbase: bool = is_coinbase
def deserialize(self, f):
self.utxo = COutPoint()
self.utxo.deserialize(f)
self.amount = struct.unpack("<q", f.read(8))[0]
height_ser = struct.unpack("<I", f.read(4))[0]
self.is_coinbase = bool(height_ser & 1)
self.height = height_ser >> 1
self.pubkey = deser_string(f)
def serialize(self) -> bytes:
r = self.utxo.serialize()
height_ser = self.height << 1 | int(self.is_coinbase)
r += struct.pack('<q', self.amount)
r += struct.pack('<I', height_ser)
r += ser_compact_size(len(self.pubkey))
r += self.pubkey
return r
def __repr__(self):
return f"AvalancheStake(utxo={self.utxo}, amount={self.amount}," \
f" height={self.height}, " \
f"pubkey={self.pubkey.hex()})"
class AvalancheSignedStake:
__slots__ = ("stake", "sig")
def __init__(self, stake=None, sig=b""):
self.stake: AvalancheStake = stake or AvalancheStake()
self.sig: bytes = sig
"""Signature for this stake, bytes of length 64"""
def deserialize(self, f):
self.stake = AvalancheStake()
self.stake.deserialize(f)
self.sig = f.read(64)
def serialize(self) -> bytes:
return self.stake.serialize() + self.sig
class AvalancheProof:
__slots__ = (
"sequence",
"expiration",
"master",
"stakes",
"payout_script",
"signature",
"limited_proofid",
"proofid")
def __init__(self, sequence=0, expiration=0,
master=b"", signed_stakes=None, payout_script=b"", signature=b""):
self.sequence: int = sequence
self.expiration: int = expiration
self.master: bytes = master
self.stakes: List[AvalancheSignedStake] = signed_stakes or [
AvalancheSignedStake()]
self.payout_script = payout_script
self.signature = signature
self.limited_proofid: int = None
self.proofid: int = None
self.compute_proof_id()
def compute_proof_id(self):
"""Compute Bitcoin's 256-bit hash (double SHA-256) of the
serialized proof data.
"""
ss = struct.pack("<Qq", self.sequence, self.expiration)
ss += ser_string(self.payout_script)
ss += ser_compact_size(len(self.stakes))
# Use unsigned stakes
for s in self.stakes:
ss += s.stake.serialize()
h = hash256(ss)
self.limited_proofid = uint256_from_str(h)
h += ser_string(self.master)
h = hash256(h)
# make it an int, for comparing with Delegation.proofid
self.proofid = uint256_from_str(h)
def deserialize(self, f):
self.sequence = struct.unpack("<Q", f.read(8))[0]
self.expiration = struct.unpack("<q", f.read(8))[0]
self.master = deser_string(f)
self.stakes = deser_vector(f, AvalancheSignedStake)
self.payout_script = deser_string(f)
self.signature = f.read(64)
self.compute_proof_id()
def serialize(self):
r = b""
r += struct.pack("<Q", self.sequence)
r += struct.pack("<q", self.expiration)
r += ser_string(self.master)
r += ser_vector(self.stakes)
r += ser_string(self.payout_script)
r += self.signature
return r
def __repr__(self):
return f"AvalancheProof(proofid={uint256_hex(self.proofid)}, " \
f"limited_proofid={uint256_hex(self.limited_proofid)}, " \
f"sequence={self.sequence}, " \
f"expiration={self.expiration}, " \
f"master={self.master.hex()}, " \
f"payout_script={self.payout_script.hex()}, " \
f"signature={b64encode(self.signature)}, " \
f"stakes={self.stakes})"
class AvalanchePrefilledProof:
__slots__ = ("index", "proof")
def __init__(self, index=0, proof=None):
self.index = index
self.proof = proof or AvalancheProof()
def deserialize(self, f):
self.index = deser_compact_size(f)
self.proof.deserialize(f)
def serialize(self):
r = b""
r += ser_compact_size(self.index)
r += self.proof.serialize()
return r
def __repr__(self):
return f"AvalanchePrefilledProof(index={self.index}, proof={self.proof!r})"
class AvalanchePoll:
__slots__ = ("round", "invs")
def __init__(self, round=0, invs=None):
self.round = round
self.invs = invs if invs is not None else []
def deserialize(self, f):
self.round = struct.unpack("<q", f.read(8))[0]
self.invs = deser_vector(f, CInv)
def serialize(self):
r = b""
r += struct.pack("<q", self.round)
r += ser_vector(self.invs)
return r
def __repr__(self):
return f"AvalanchePoll(round={self.round}, invs={self.invs!r})"
class AvalancheVoteError(IntEnum):
ACCEPTED = 0
INVALID = 1
PARKED = 2
FORK = 3
UNKNOWN = -1
MISSING = -2
PENDING = -3
class AvalancheProofVoteResponse(IntEnum):
ACTIVE = 0
REJECTED = 1
IMMATURE = 2
CONFLICT = 3
UNKNOWN = -1
class AvalancheTxVoteError(IntEnum):
ACCEPTED = 0
INVALID = 1
ORPHAN = 2
UNKNOWN = -1
class AvalancheVote:
__slots__ = ("error", "hash")
def __init__(self, e=0, h=0):
self.error = e
self.hash = h
def deserialize(self, f):
self.error = struct.unpack("<i", f.read(4))[0]
self.hash = deser_uint256(f)
def serialize(self):
r = b""
r += struct.pack("<i", self.error)
r += ser_uint256(self.hash)
return r
def __repr__(self):
return f"AvalancheVote(error={self.error}, hash={uint256_hex(self.hash)})"
class AvalancheResponse:
__slots__ = ("round", "cooldown", "votes")
def __init__(self, round=0, cooldown=0, votes=None):
self.round = round
self.cooldown = cooldown
self.votes = votes if votes is not None else []
def deserialize(self, f):
self.round = struct.unpack("<q", f.read(8))[0]
self.cooldown = struct.unpack("<i", f.read(4))[0]
self.votes = deser_vector(f, AvalancheVote)
def serialize(self):
r = b""
r += struct.pack("<q", self.round)
r += struct.pack("<i", self.cooldown)
r += ser_vector(self.votes)
return r
def get_hash(self):
return hash256(self.serialize())
def __repr__(self):
return (
f"AvalancheResponse(round={self.round}, cooldown={self.cooldown}, "
f"votes={self.votes!r})"
)
class TCPAvalancheResponse:
__slots__ = ("response", "sig")
def __init__(self, response=AvalancheResponse(), sig=b"\0" * 64):
self.response = response
self.sig = sig
def deserialize(self, f):
self.response.deserialize(f)
self.sig = f.read(64)
def serialize(self):
r = b""
r += self.response.serialize()
r += self.sig
return r
def __repr__(self):
return f"TCPAvalancheResponse(response={self.response!r}, sig={self.sig})"
class AvalancheDelegationLevel:
__slots__ = ("pubkey", "sig")
def __init__(self, pubkey=b"", sig=b"\0" * 64):
self.pubkey = pubkey
self.sig = sig
def deserialize(self, f):
self.pubkey = deser_string(f)
self.sig = f.read(64)
def serialize(self):
r = b""
r += ser_string(self.pubkey)
r += self.sig
return r
def __repr__(self):
return f"AvalancheDelegationLevel(pubkey={self.pubkey.hex()}, sig={self.sig})"
class AvalancheDelegation:
__slots__ = ("limited_proofid", "proof_master", "proofid", "levels")
def __init__(self, limited_proofid=0,
proof_master=b"", levels=None):
self.limited_proofid: int = limited_proofid
self.proof_master: bytes = proof_master
self.levels: List[AvalancheDelegationLevel] = levels or []
self.proofid: int = self.compute_proofid()
def compute_proofid(self) -> int:
return uint256_from_str(hash256(
ser_uint256(self.limited_proofid) + ser_string(self.proof_master)))
def deserialize(self, f):
self.limited_proofid = deser_uint256(f)
self.proof_master = deser_string(f)
self.levels = deser_vector(f, AvalancheDelegationLevel)
self.proofid = self.compute_proofid()
def serialize(self):
r = b""
r += ser_uint256(self.limited_proofid)
r += ser_string(self.proof_master)
r += ser_vector(self.levels)
return r
def __repr__(self):
return f"AvalancheDelegation(" \
f"limitedProofId={uint256_hex(self.limited_proofid)}, " \
f"proofMaster={self.proof_master.hex()}, " \
f"proofid={uint256_hex(self.proofid)}, " \
f"levels={self.levels})"
def getid(self):
h = ser_uint256(self.proofid)
for level in self.levels:
h = hash256(h + ser_string(level.pubkey))
return h
class AvalancheHello:
__slots__ = ("delegation", "sig")
def __init__(self, delegation=AvalancheDelegation(), sig=b"\0" * 64):
self.delegation = delegation
self.sig = sig
def deserialize(self, f):
self.delegation.deserialize(f)
self.sig = f.read(64)
def serialize(self):
r = b""
r += self.delegation.serialize()
r += self.sig
return r
def __repr__(self):
return f"AvalancheHello(delegation={self.delegation!r}, sig={self.sig})"
def get_sighash(self, node):
b = self.delegation.getid()
b += struct.pack("<Q", node.remote_nonce)
b += struct.pack("<Q", node.local_nonce)
b += struct.pack("<Q", node.remote_extra_entropy)
b += struct.pack("<Q", node.local_extra_entropy)
return hash256(b)
class CPartialMerkleTree:
__slots__ = ("nTransactions", "vBits", "vHash")
def __init__(self):
self.nTransactions = 0
self.vHash = []
self.vBits = []
def deserialize(self, f):
self.nTransactions = struct.unpack("<i", f.read(4))[0]
self.vHash = deser_uint256_vector(f)
vBytes = deser_string(f)
self.vBits = []
for i in range(len(vBytes) * 8):
self.vBits.append(vBytes[i // 8] & (1 << (i % 8)) != 0)
def serialize(self):
r = b""
r += struct.pack("<i", self.nTransactions)
r += ser_uint256_vector(self.vHash)
vBytesArray = bytearray([0x00] * ((len(self.vBits) + 7) // 8))
for i in range(len(self.vBits)):
vBytesArray[i // 8] |= self.vBits[i] << (i % 8)
r += ser_string(bytes(vBytesArray))
return r
def __repr__(self):
return (
f"CPartialMerkleTree(nTransactions={self.nTransactions}, "
f"Hash={self.vHash!r}, vBits={self.vBits!r})"
)
class CMerkleBlock:
__slots__ = ("header", "txn")
def __init__(self):
self.header = CBlockHeader()
self.txn = CPartialMerkleTree()
def deserialize(self, f):
self.header.deserialize(f)
self.txn.deserialize(f)
def serialize(self):
r = b""
r += self.header.serialize()
r += self.txn.serialize()
return r
def __repr__(self):
return f"CMerkleBlock(header={self.header!r}, txn={self.txn!r})"
# Objects that correspond to messages on the wire
class msg_version:
__slots__ = ("addrFrom", "addrTo", "nNonce", "relay", "nServices",
"nStartingHeight", "nTime", "nVersion", "strSubVer", "nExtraEntropy")
msgtype = b"version"
def __init__(self):
self.nVersion = 0
self.nServices = 0
self.nTime = int(time.time())
self.addrTo = CAddress()
self.addrFrom = CAddress()
self.nNonce = random.getrandbits(64)
self.strSubVer = ''
self.nStartingHeight = -1
self.relay = 0
self.nExtraEntropy = random.getrandbits(64)
def deserialize(self, f):
self.nVersion = struct.unpack("<i", f.read(4))[0]
self.nServices = struct.unpack("<Q", f.read(8))[0]
self.nTime = struct.unpack("<q", f.read(8))[0]
self.addrTo = CAddress()
self.addrTo.deserialize(f, with_time=False)
self.addrFrom = CAddress()
self.addrFrom.deserialize(f, with_time=False)
self.nNonce = struct.unpack("<Q", f.read(8))[0]
self.strSubVer = deser_string(f).decode('utf-8')
self.nStartingHeight = struct.unpack("<i", f.read(4))[0]
self.relay = struct.unpack("<b", f.read(1))[0]
self.nExtraEntropy = struct.unpack("<Q", f.read(8))[0]
def serialize(self):
r = b""
r += struct.pack("<i", self.nVersion)
r += struct.pack("<Q", self.nServices)
r += struct.pack("<q", self.nTime)
r += self.addrTo.serialize(with_time=False)
r += self.addrFrom.serialize(with_time=False)
r += struct.pack("<Q", self.nNonce)
r += ser_string(self.strSubVer.encode('utf-8'))
r += struct.pack("<i", self.nStartingHeight)
r += struct.pack("<b", self.relay)
r += struct.pack("<Q", self.nExtraEntropy)
return r
def __repr__(self):
return (
f'msg_version(nVersion={self.nVersion} nServices={self.nServices} '
f'nTime={self.nTime} addrTo={self.addrTo!r} addrFrom={self.addrFrom!r} '
f'nNonce=0x{self.nNonce:016X} strSubVer={self.strSubVer} '
f'nStartingHeight={self.nStartingHeight} relay={self.relay} '
f'nExtraEntropy={self.nExtraEntropy})'
)
class msg_verack:
__slots__ = ()
msgtype = b"verack"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_verack()"
class msg_addr:
__slots__ = ("addrs",)
msgtype = b"addr"
def __init__(self):
self.addrs = []
def deserialize(self, f):
self.addrs = deser_vector(f, CAddress)
def serialize(self):
return ser_vector(self.addrs)
def __repr__(self):
return f"msg_addr(addrs={self.addrs!r})"
class msg_addrv2:
__slots__ = ("addrs",)
msgtype = b"addrv2"
def __init__(self):
self.addrs = []
def deserialize(self, f):
self.addrs = deser_vector(f, CAddress, "deserialize_v2")
def serialize(self):
return ser_vector(self.addrs, "serialize_v2")
def __repr__(self):
return f"msg_addrv2(addrs={self.addrs!r})"
class msg_sendaddrv2:
__slots__ = ()
msgtype = b"sendaddrv2"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_sendaddrv2()"
class msg_inv:
__slots__ = ("inv",)
msgtype = b"inv"
def __init__(self, inv=None):
if inv is None:
self.inv = []
else:
self.inv = inv
def deserialize(self, f):
self.inv = deser_vector(f, CInv)
def serialize(self):
return ser_vector(self.inv)
def __repr__(self):
return f"msg_inv(inv={self.inv!r})"
class msg_getdata:
__slots__ = ("inv",)
msgtype = b"getdata"
def __init__(self, inv=None):
self.inv = inv if inv is not None else []
def deserialize(self, f):
self.inv = deser_vector(f, CInv)
def serialize(self):
return ser_vector(self.inv)
def __repr__(self):
return f"msg_getdata(inv={self.inv!r})"
class msg_getblocks:
__slots__ = ("locator", "hashstop")
msgtype = b"getblocks"
def __init__(self):
self.locator = CBlockLocator()
self.hashstop = 0
def deserialize(self, f):
self.locator = CBlockLocator()
self.locator.deserialize(f)
self.hashstop = deser_uint256(f)
def serialize(self):
r = b""
r += self.locator.serialize()
r += ser_uint256(self.hashstop)
return r
def __repr__(self):
return (
f"msg_getblocks(locator={self.locator!r} "
f"hashstop={uint256_hex(self.hashstop)})"
)
class msg_tx:
__slots__ = ("tx",)
msgtype = b"tx"
def __init__(self, tx=CTransaction()):
self.tx = tx
def deserialize(self, f):
self.tx.deserialize(f)
def serialize(self):
return self.tx.serialize()
def __repr__(self):
return f"msg_tx(tx={self.tx!r})"
class msg_block:
__slots__ = ("block",)
msgtype = b"block"
def __init__(self, block=None):
if block is None:
self.block = CBlock()
else:
self.block = block
def deserialize(self, f):
self.block.deserialize(f)
def serialize(self):
return self.block.serialize()
def __repr__(self):
return f"msg_block(block={self.block!r})"
# for cases where a user needs tighter control over what is sent over the wire
# note that the user must supply the name of the msgtype, and the data
class msg_generic:
__slots__ = ("data")
def __init__(self, msgtype, data=None):
self.msgtype = msgtype
self.data = data
def serialize(self):
return self.data
def __repr__(self):
return "msg_generic()"
class msg_getaddr:
__slots__ = ()
msgtype = b"getaddr"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_getaddr()"
class msg_ping:
__slots__ = ("nonce",)
msgtype = b"ping"
def __init__(self, nonce=0):
self.nonce = nonce
def deserialize(self, f):
self.nonce = struct.unpack("<Q", f.read(8))[0]
def serialize(self):
r = b""
r += struct.pack("<Q", self.nonce)
return r
def __repr__(self):
return f"msg_ping(nonce={self.nonce:08x})"
class msg_pong:
__slots__ = ("nonce",)
msgtype = b"pong"
def __init__(self, nonce=0):
self.nonce = nonce
def deserialize(self, f):
self.nonce = struct.unpack("<Q", f.read(8))[0]
def serialize(self):
r = b""
r += struct.pack("<Q", self.nonce)
return r
def __repr__(self):
return f"msg_pong(nonce={self.nonce:08x})"
class msg_mempool:
__slots__ = ()
msgtype = b"mempool"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_mempool()"
class msg_notfound:
__slots__ = ("vec", )
msgtype = b"notfound"
def __init__(self, vec=None):
self.vec = vec or []
def deserialize(self, f):
self.vec = deser_vector(f, CInv)
def serialize(self):
return ser_vector(self.vec)
def __repr__(self):
return f"msg_notfound(vec={self.vec!r})"
class msg_sendheaders:
__slots__ = ()
msgtype = b"sendheaders"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_sendheaders()"
# getheaders message has
# number of entries
# vector of hashes
# hash_stop (hash of last desired block header, 0 to get as many as possible)
class msg_getheaders:
__slots__ = ("hashstop", "locator",)
msgtype = b"getheaders"
def __init__(self):
self.locator = CBlockLocator()
self.hashstop = 0
def deserialize(self, f):
self.locator = CBlockLocator()
self.locator.deserialize(f)
self.hashstop = deser_uint256(f)
def serialize(self):
r = b""
r += self.locator.serialize()
r += ser_uint256(self.hashstop)
return r
def __repr__(self):
return (
f"msg_getheaders(locator={self.locator!r}, "
f"stop={uint256_hex(self.hashstop)})"
)
# headers message has
# <count> <vector of block headers>
class msg_headers:
__slots__ = ("headers",)
msgtype = b"headers"
def __init__(self, headers=None):
self.headers = headers if headers is not None else []
def deserialize(self, f):
# comment in bitcoind indicates these should be deserialized as blocks
blocks = deser_vector(f, CBlock)
for x in blocks:
self.headers.append(CBlockHeader(x))
def serialize(self):
blocks = [CBlock(x) for x in self.headers]
return ser_vector(blocks)
def __repr__(self):
return f"msg_headers(headers={self.headers!r})"
class msg_merkleblock:
__slots__ = ("merkleblock",)
msgtype = b"merkleblock"
def __init__(self, merkleblock=None):
if merkleblock is None:
self.merkleblock = CMerkleBlock()
else:
self.merkleblock = merkleblock
def deserialize(self, f):
self.merkleblock.deserialize(f)
def serialize(self):
return self.merkleblock.serialize()
def __repr__(self):
return f"msg_merkleblock(merkleblock={self.merkleblock!r})"
class msg_filterload:
__slots__ = ("data", "nHashFuncs", "nTweak", "nFlags")
msgtype = b"filterload"
def __init__(self, data=b'00', nHashFuncs=0, nTweak=0, nFlags=0):
self.data = data
self.nHashFuncs = nHashFuncs
self.nTweak = nTweak
self.nFlags = nFlags
def deserialize(self, f):
self.data = deser_string(f)
self.nHashFuncs = struct.unpack("<I", f.read(4))[0]
self.nTweak = struct.unpack("<I", f.read(4))[0]
self.nFlags = struct.unpack("<B", f.read(1))[0]
def serialize(self):
r = b""
r += ser_string(self.data)
r += struct.pack("<I", self.nHashFuncs)
r += struct.pack("<I", self.nTweak)
r += struct.pack("<B", self.nFlags)
return r
def __repr__(self):
return (
f"msg_filterload(data={self.data}, nHashFuncs={self.nHashFuncs}, "
f"nTweak={self.nTweak}, nFlags={self.nFlags})"
)
class msg_filteradd:
__slots__ = ("data")
msgtype = b"filteradd"
def __init__(self, data):
self.data = data
def deserialize(self, f):
self.data = deser_string(f)
def serialize(self):
r = b""
r += ser_string(self.data)
return r
def __repr__(self):
return f"msg_filteradd(data={self.data})"
class msg_filterclear:
__slots__ = ()
msgtype = b"filterclear"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_filterclear()"
class msg_feefilter:
__slots__ = ("feerate",)
msgtype = b"feefilter"
def __init__(self, feerate=0):
self.feerate = feerate
def deserialize(self, f):
self.feerate = struct.unpack("<Q", f.read(8))[0]
def serialize(self):
r = b""
r += struct.pack("<Q", self.feerate)
return r
def __repr__(self):
return f"msg_feefilter(feerate={self.feerate:08x})"
class msg_sendcmpct:
__slots__ = ("announce", "version")
msgtype = b"sendcmpct"
def __init__(self, announce=False, version=1):
self.announce = announce
self.version = version
def deserialize(self, f):
self.announce = struct.unpack("<?", f.read(1))[0]
self.version = struct.unpack("<Q", f.read(8))[0]
def serialize(self):
r = b""
r += struct.pack("<?", self.announce)
r += struct.pack("<Q", self.version)
return r
def __repr__(self):
return f"msg_sendcmpct(announce={self.announce}, version={self.version})"
class msg_cmpctblock:
__slots__ = ("header_and_shortids",)
msgtype = b"cmpctblock"
def __init__(self, header_and_shortids=None):
self.header_and_shortids = header_and_shortids
def deserialize(self, f):
self.header_and_shortids = P2PHeaderAndShortIDs()
self.header_and_shortids.deserialize(f)
def serialize(self):
r = b""
r += self.header_and_shortids.serialize()
return r
def __repr__(self):
return f"msg_cmpctblock(HeaderAndShortIDs={self.header_and_shortids!r})"
class msg_getblocktxn:
__slots__ = ("block_txn_request",)
msgtype = b"getblocktxn"
def __init__(self):
self.block_txn_request = None
def deserialize(self, f):
self.block_txn_request = BlockTransactionsRequest()
self.block_txn_request.deserialize(f)
def serialize(self):
r = b""
r += self.block_txn_request.serialize()
return r
def __repr__(self):
return f"msg_getblocktxn(block_txn_request={self.block_txn_request!r})"
class msg_blocktxn:
__slots__ = ("block_transactions",)
msgtype = b"blocktxn"
def __init__(self):
self.block_transactions = BlockTransactions()
def deserialize(self, f):
self.block_transactions.deserialize(f)
def serialize(self):
r = b""
r += self.block_transactions.serialize()
return r
def __repr__(self):
return f"msg_blocktxn(block_transactions={self.block_transactions!r})"
class msg_getcfilters:
__slots__ = ("filter_type", "start_height", "stop_hash")
msgtype = b"getcfilters"
def __init__(self, filter_type, start_height, stop_hash):
self.filter_type = filter_type
self.start_height = start_height
self.stop_hash = stop_hash
def deserialize(self, f):
self.filter_type = struct.unpack("<B", f.read(1))[0]
self.start_height = struct.unpack("<I", f.read(4))[0]
self.stop_hash = deser_uint256(f)
def serialize(self):
r = b""
r += struct.pack("<B", self.filter_type)
r += struct.pack("<I", self.start_height)
r += ser_uint256(self.stop_hash)
return r
def __repr__(self):
return (
f"msg_getcfilters(filter_type={self.filter_type:#x}, "
f"start_height={self.start_height}, stop_hash={self.stop_hash:x})"
)
class msg_cfilter:
__slots__ = ("filter_type", "block_hash", "filter_data")
msgtype = b"cfilter"
def __init__(self, filter_type=None, block_hash=None, filter_data=None):
self.filter_type = filter_type
self.block_hash = block_hash
self.filter_data = filter_data
def deserialize(self, f):
self.filter_type = struct.unpack("<B", f.read(1))[0]
self.block_hash = deser_uint256(f)
self.filter_data = deser_string(f)
def serialize(self):
r = b""
r += struct.pack("<B", self.filter_type)
r += ser_uint256(self.block_hash)
r += ser_string(self.filter_data)
return r
def __repr__(self):
return (
f"msg_cfilter(filter_type={self.filter_type:#x}, "
f"block_hash={self.block_hash:x})"
)
class msg_getcfheaders:
__slots__ = ("filter_type", "start_height", "stop_hash")
msgtype = b"getcfheaders"
def __init__(self, filter_type, start_height, stop_hash):
self.filter_type = filter_type
self.start_height = start_height
self.stop_hash = stop_hash
def deserialize(self, f):
self.filter_type = struct.unpack("<B", f.read(1))[0]
self.start_height = struct.unpack("<I", f.read(4))[0]
self.stop_hash = deser_uint256(f)
def serialize(self):
r = b""
r += struct.pack("<B", self.filter_type)
r += struct.pack("<I", self.start_height)
r += ser_uint256(self.stop_hash)
return r
def __repr__(self):
return (
f"msg_getcfheaders(filter_type={self.filter_type:#x}, "
f"start_height={self.start_height}, stop_hash={self.stop_hash:x})"
)
class msg_cfheaders:
__slots__ = ("filter_type", "stop_hash", "prev_header", "hashes")
msgtype = b"cfheaders"
def __init__(self, filter_type=None, stop_hash=None,
prev_header=None, hashes=None):
self.filter_type = filter_type
self.stop_hash = stop_hash
self.prev_header = prev_header
self.hashes = hashes
def deserialize(self, f):
self.filter_type = struct.unpack("<B", f.read(1))[0]
self.stop_hash = deser_uint256(f)
self.prev_header = deser_uint256(f)
self.hashes = deser_uint256_vector(f)
def serialize(self):
r = b""
r += struct.pack("<B", self.filter_type)
r += ser_uint256(self.stop_hash)
r += ser_uint256(self.prev_header)
r += ser_uint256_vector(self.hashes)
return r
def __repr__(self):
return (
f"msg_cfheaders(filter_type={self.filter_type:#x}, "
f"stop_hash={self.stop_hash:x})"
)
class msg_getcfcheckpt:
__slots__ = ("filter_type", "stop_hash")
msgtype = b"getcfcheckpt"
def __init__(self, filter_type, stop_hash):
self.filter_type = filter_type
self.stop_hash = stop_hash
def deserialize(self, f):
self.filter_type = struct.unpack("<B", f.read(1))[0]
self.stop_hash = deser_uint256(f)
def serialize(self):
r = b""
r += struct.pack("<B", self.filter_type)
r += ser_uint256(self.stop_hash)
return r
def __repr__(self):
return (
f"msg_getcfcheckpt(filter_type={self.filter_type:#x}, "
f"stop_hash={self.stop_hash:x})"
)
class msg_cfcheckpt:
__slots__ = ("filter_type", "stop_hash", "headers")
msgtype = b"cfcheckpt"
def __init__(self, filter_type=None, stop_hash=None, headers=None):
self.filter_type = filter_type
self.stop_hash = stop_hash
self.headers = headers
def deserialize(self, f):
self.filter_type = struct.unpack("<B", f.read(1))[0]
self.stop_hash = deser_uint256(f)
self.headers = deser_uint256_vector(f)
def serialize(self):
r = b""
r += struct.pack("<B", self.filter_type)
r += ser_uint256(self.stop_hash)
r += ser_uint256_vector(self.headers)
return r
def __repr__(self):
return (
f"msg_cfcheckpt(filter_type={self.filter_type:#x}, "
f"stop_hash={self.stop_hash:x})"
)
class msg_avaproof:
__slots__ = ("proof",)
msgtype = b"avaproof"
def __init__(self):
self.proof = AvalancheProof()
def deserialize(self, f):
self.proof.deserialize(f)
def serialize(self):
r = b""
r += self.proof.serialize()
return r
def __repr__(self):
return f"msg_avaproof(proof={self.proof!r})"
class msg_avapoll:
__slots__ = ("poll",)
msgtype = b"avapoll"
def __init__(self):
self.poll = AvalanchePoll()
def deserialize(self, f):
self.poll.deserialize(f)
def serialize(self):
r = b""
r += self.poll.serialize()
return r
def __repr__(self):
return f"msg_avapoll(poll={self.poll!r})"
class msg_avaresponse:
__slots__ = ("response",)
msgtype = b"avaresponse"
def __init__(self):
self.response = AvalancheResponse()
def deserialize(self, f):
self.response.deserialize(f)
def serialize(self):
r = b""
r += self.response.serialize()
return r
def __repr__(self):
return f"msg_avaresponse(response={self.response!r})"
class msg_tcpavaresponse:
__slots__ = ("response",)
msgtype = b"avaresponse"
def __init__(self):
self.response = TCPAvalancheResponse()
def deserialize(self, f):
self.response.deserialize(f)
def serialize(self):
r = b""
r += self.response.serialize()
return r
def __repr__(self):
return f"msg_tcpavaresponse(response={self.response!r})"
class msg_avahello:
__slots__ = ("hello",)
msgtype = b"avahello"
def __init__(self):
self.hello = AvalancheHello()
def deserialize(self, f):
self.hello.deserialize(f)
def serialize(self):
r = b""
r += self.hello.serialize()
return r
def __repr__(self):
return f"msg_avahello(response={self.hello!r})"
class msg_getavaaddr:
__slots__ = ()
msgtype = b"getavaaddr"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_getavaaddr()"
class msg_getavaproofs:
__slots__ = ()
msgtype = b"getavaproofs"
def __init__(self):
pass
def deserialize(self, f):
pass
def serialize(self):
return b""
def __repr__(self):
return "msg_getavaproofs()"
class msg_avaproofs:
__slots__ = ("key0", "key1", "shortids", "prefilled_proofs")
msgtype = b"avaproofs"
def __init__(self):
self.key0 = 0
self.key1 = 0
self.shortids = []
self.prefilled_proofs = []
def deserialize(self, f):
self.key0 = struct.unpack("<Q", f.read(8))[0]
self.key1 = struct.unpack("<Q", f.read(8))[0]
shortids_length = deser_compact_size(f)
for _ in range(shortids_length):
# shortids are defined to be 6 bytes in the spec, so append
# two zero bytes and read it in as an 8-byte number
self.shortids.append(
struct.unpack("<Q", f.read(6) + b'\x00\x00')[0])
# The indices are differentially encoded
self.prefilled_proofs = deser_vector(f, AvalanchePrefilledProof)
current_indice = -1
for p in self.prefilled_proofs:
current_indice += p.index + 1
p.index = current_indice
def serialize(self):
r = b""
r += struct.pack("<Q", self.key0)
r += struct.pack("<Q", self.key1)
r += ser_compact_size(len(self.shortids))
for shortid in self.shortids:
# We only want the first 6 bytes
r += struct.pack("<Q", shortid)[0:6]
r += ser_compact_size(len(self.prefilled_proofs))
if (len(self.prefilled_proofs) < 1):
return r
# The indices are differentially encoded
r += self.prefilled_proofs[0].serialize()
for i in range(len(self.prefilled_proofs[1:])):
r += ser_compact_size(
self.prefilled_proofs[i + 1].index - self.prefilled_proofs[i].index - 1)
r += self.prefilled_proofs[i].proof.serialize()
return r
def __repr__(self):
return (
f"msg_avaproofs(key0={self.key0}, key1={self.key1}, "
f"len(shortids)={len(self.shortids)}, shortids={self.shortids}), "
f"len(prefilled_proofs)={len(self.prefilled_proofs)}, "
f"prefilled_proofs={self.prefilled_proofs})"
)
class msg_avaproofsreq:
__slots__ = ("indices")
msgtype = b"avaproofsreq"
def __init__(self):
self.indices = []
def deserialize(self, f):
indices_length = deser_compact_size(f)
# The indices are differentially encoded
current_indice = -1
for _ in range(indices_length):
current_indice += deser_compact_size(f) + 1
self.indices.append(current_indice)
def serialize(self):
r = b""
r += ser_compact_size(len(self.indices))
if (len(self.indices) < 1):
return r
# The indices are differentially encoded
r += ser_compact_size(self.indices[0])
for i in range(len(self.indices[1:])):
r += ser_compact_size(self.indices[i + 1] - self.indices[i] - 1)
return r
def __repr__(self):
return (
f"msg_avaproofsreq(len(shortids)={len(self.indices)}, "
f"indices={self.indices})"
)
class TestFrameworkMessages(unittest.TestCase):
def test_avalanche_proof_serialization_round_trip(self):
"""Verify that an AvalancheProof object is unchanged after a round-trip
of deserialization-serialization.
"""
# Extracted from proof_tests.cpp
proof_hex = (
"d97587e6c882615796011ec8f9a7b1c621023beefdde700a6bc02036335b4df141"
"c8bc67bb05a971f5ac2745fd683797dde30169a79ff23e1d58c64afad42ad81cff"
"e53967e16beb692fc5776bb442c79c5d91de00cf21804712806594010038e168a3"
"2102449fb5237efe8f647d32e8b64f06c22d1d40368eaca2a71ffc6a13ecc8bce6"
"804534ca1f5e22670be3df5cbd5957d8dd83d05c8f17eae391f0e7ffdce4fb3def"
"adb7c079473ebeccf88c1f8ce87c61e451447b89c445967335ffd1aadef4299823"
"21023beefdde700a6bc02036335b4df141c8bc67bb05a971f5ac2745fd683797dd"
"e3ac7b0b7865200f63052ff980b93f965f398dda04917d411dd46e3c009a5fef35"
"661fac28779b6a22760c00004f5ddf7d9865c7fead7e4a840b947939590261640f"
)
avaproof = FromHex(AvalancheProof(), proof_hex)
self.assertEqual(ToHex(avaproof), proof_hex)
self.assertEqual(
uint256_hex(avaproof.proofid),
"455f34eb8a00b0799630071c0728481bdb1653035b1484ac33e974aa4ae7db6d"
)
self.assertEqual(avaproof.sequence, 6296457553413371353)
self.assertEqual(avaproof.expiration, -4129334692075929194)
self.assertEqual(avaproof.master, bytes.fromhex(
"023beefdde700a6bc02036335b4df141c8bc67bb05a971f5ac2745fd683797dde3"
))
# P2PK to master pubkey
# We can't use a CScript() here because it would cause a circular
# import
self.assertEqual(avaproof.payout_script, bytes.fromhex(
"21023beefdde700a6bc02036335b4df141c8bc67bb05a971f5ac2745fd683797dde3ac"))
self.assertEqual(avaproof.signature, b64decode(
"ewt4ZSAPYwUv+YC5P5ZfOY3aBJF9QR3UbjwAml/vNWYfrCh3m2oidgwAAE9d332YZcf+rX5KhAuUeTlZAmFkDw=="))
self.assertEqual(len(avaproof.stakes), 1)
self.assertEqual(avaproof.stakes[0].sig, b64decode(
"RTTKH14iZwvj31y9WVfY3YPQXI8X6uOR8Of/3OT7Pe+tt8B5Rz6+zPiMH4zofGHkUUR7icRFlnM1/9Gq3vQpmA=="))
- self.assertEqual(f"{avaproof.stakes[0].stake.utxo.hash:x}",
+ self.assertEqual(f"{avaproof.stakes[0].stake.utxo.txid:x}",
"915d9cc742b46b77c52f69eb6be16739e5ff1cd82ad4fa4ac6581d3ef29fa769"
)
self.assertEqual(avaproof.stakes[0].stake.utxo.n, 567214302)
self.assertEqual(avaproof.stakes[0].stake.amount, 444638638000000)
self.assertEqual(avaproof.stakes[0].stake.height, 1370779804)
self.assertEqual(avaproof.stakes[0].stake.is_coinbase, False)
self.assertEqual(avaproof.stakes[0].stake.pubkey, bytes.fromhex(
"02449fb5237efe8f647d32e8b64f06c22d1d40368eaca2a71ffc6a13ecc8bce680"
))
msg_proof = msg_avaproof()
msg_proof.proof = avaproof
self.assertEqual(ToHex(msg_proof), proof_hex)

File Metadata

Mime Type
text/x-diff
Expires
Sun, Apr 27, 10:27 (1 d, 5 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5573257
Default Alt Text
(119 KB)

Event Timeline