Changeset View
Changeset View
Standalone View
Standalone View
test/functional/chronik_tx.py
Show All 18 Lines | |||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
from test_framework.util import assert_equal | from test_framework.util import assert_equal | ||||
class ChronikTxTest(BitcoinTestFramework): | class ChronikTxTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.setup_clean_chain = True | self.setup_clean_chain = True | ||||
self.num_nodes = 1 | self.num_nodes = 1 | ||||
self.extra_args = [['-chronik']] | self.extra_args = [["-chronik"]] | ||||
self.rpc_timeout = 240 | self.rpc_timeout = 240 | ||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_chronik() | self.skip_if_no_chronik() | ||||
def run_test(self): | def run_test(self): | ||||
from test_framework.chronik.client import ChronikClient, pb | from test_framework.chronik.client import ChronikClient, pb | ||||
from test_framework.chronik.test_data import genesis_cb_tx | from test_framework.chronik.test_data import genesis_cb_tx | ||||
node = self.nodes[0] | node = self.nodes[0] | ||||
chronik = ChronikClient('127.0.0.1', node.chronik_port) | chronik = ChronikClient("127.0.0.1", node.chronik_port) | ||||
peer = node.add_p2p_connection(P2PDataStore()) | peer = node.add_p2p_connection(P2PDataStore()) | ||||
node.setmocktime(1333333337) | node.setmocktime(1333333337) | ||||
assert_equal(chronik.tx('0').err(400).msg, '400: Not a txid: 0') | assert_equal(chronik.tx("0").err(400).msg, "400: Not a txid: 0") | ||||
assert_equal(chronik.tx('123').err(400).msg, '400: Not a txid: 123') | assert_equal(chronik.tx("123").err(400).msg, "400: Not a txid: 123") | ||||
assert_equal(chronik.tx('1234f').err(400).msg, '400: Not a txid: 1234f') | assert_equal(chronik.tx("1234f").err(400).msg, "400: Not a txid: 1234f") | ||||
assert_equal(chronik.tx('00' * 31).err(400).msg, f'400: Not a txid: {"00"*31}') | assert_equal(chronik.tx("00" * 31).err(400).msg, f'400: Not a txid: {"00"*31}') | ||||
assert_equal(chronik.tx('01').err(400).msg, '400: Not a txid: 01') | assert_equal(chronik.tx("01").err(400).msg, "400: Not a txid: 01") | ||||
assert_equal(chronik.tx('12345678901').err(400).msg, | assert_equal( | ||||
'400: Not a txid: 12345678901') | chronik.tx("12345678901").err(400).msg, "400: Not a txid: 12345678901" | ||||
) | |||||
assert_equal(chronik.tx('00' * 32).err(404).msg, | assert_equal( | ||||
f'404: Transaction {"00"*32} not found in the index') | chronik.tx("00" * 32).err(404).msg, | ||||
f'404: Transaction {"00"*32} not found in the index', | |||||
) | |||||
# Verify queried genesis tx matches | # Verify queried genesis tx matches | ||||
assert_equal(chronik.tx(GENESIS_CB_TXID).ok(), genesis_cb_tx()) | assert_equal(chronik.tx(GENESIS_CB_TXID).ok(), genesis_cb_tx()) | ||||
coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] | coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] | ||||
coinblock = node.getblock(coinblockhash) | coinblock = node.getblock(coinblockhash) | ||||
cointx = coinblock['tx'][0] | cointx = coinblock["tx"][0] | ||||
self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) | self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) | ||||
coinvalue = 5000000000 | coinvalue = 5000000000 | ||||
send_values = [coinvalue - 10000, 1000, 2000, 3000] | send_values = [coinvalue - 10000, 1000, 2000, 3000] | ||||
send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] | send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] | ||||
send_scripts = [CScript([OP_HASH160, hash160(redeem_script), OP_EQUAL]) | send_scripts = [ | ||||
for redeem_script in send_redeem_scripts] | CScript([OP_HASH160, hash160(redeem_script), OP_EQUAL]) | ||||
for redeem_script in send_redeem_scripts | |||||
] | |||||
tx = CTransaction() | tx = CTransaction() | ||||
tx.nVersion = 2 | tx.nVersion = 2 | ||||
tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), | tx.vin = [ | ||||
CTxIn( | |||||
outpoint=COutPoint(int(cointx, 16), 0), | |||||
scriptSig=SCRIPTSIG_OP_TRUE, | scriptSig=SCRIPTSIG_OP_TRUE, | ||||
nSequence=0xfffffffe)] | nSequence=0xFFFFFFFE, | ||||
tx.vout = [CTxOut(value, script) | ) | ||||
for (value, script) in zip(send_values, send_scripts)] | ] | ||||
tx.vout = [ | |||||
CTxOut(value, script) for (value, script) in zip(send_values, send_scripts) | |||||
] | |||||
tx.nLockTime = 1234567890 | tx.nLockTime = 1234567890 | ||||
# Submit tx to mempool | # Submit tx to mempool | ||||
txid = node.sendrawtransaction(tx.serialize().hex()) | txid = node.sendrawtransaction(tx.serialize().hex()) | ||||
proto_tx = pb.Tx( | proto_tx = pb.Tx( | ||||
txid=bytes.fromhex(txid)[::-1], | txid=bytes.fromhex(txid)[::-1], | ||||
version=tx.nVersion, | version=tx.nVersion, | ||||
inputs=[pb.TxInput( | inputs=[ | ||||
pb.TxInput( | |||||
prev_out=pb.OutPoint(txid=bytes.fromhex(cointx)[::-1], out_idx=0), | prev_out=pb.OutPoint(txid=bytes.fromhex(cointx)[::-1], out_idx=0), | ||||
input_script=bytes(tx.vin[0].scriptSig), | input_script=bytes(tx.vin[0].scriptSig), | ||||
output_script=bytes(P2SH_OP_TRUE), | output_script=bytes(P2SH_OP_TRUE), | ||||
value=coinvalue, | value=coinvalue, | ||||
sequence_no=0xfffffffe, | sequence_no=0xFFFFFFFE, | ||||
)], | ) | ||||
outputs=[pb.TxOutput( | ], | ||||
outputs=[ | |||||
pb.TxOutput( | |||||
value=value, | value=value, | ||||
output_script=bytes(script), | output_script=bytes(script), | ||||
) for value, script in zip(send_values, send_scripts)], | ) | ||||
for value, script in zip(send_values, send_scripts) | |||||
], | |||||
lock_time=1234567890, | lock_time=1234567890, | ||||
block=None, | block=None, | ||||
time_first_seen=1333333337, | time_first_seen=1333333337, | ||||
size=len(tx.serialize()), | size=len(tx.serialize()), | ||||
is_coinbase=False, | is_coinbase=False, | ||||
) | ) | ||||
assert_equal(chronik.tx(txid).ok(), proto_tx) | assert_equal(chronik.tx(txid).ok(), proto_tx) | ||||
# If we mine the block, querying will gives us all the tx details + block | # If we mine the block, querying will gives us all the tx details + block | ||||
txblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | txblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | ||||
# Set the `block` field, now that we mined it | # Set the `block` field, now that we mined it | ||||
proto_tx.block.CopyFrom(pb.BlockMetadata( | proto_tx.block.CopyFrom( | ||||
pb.BlockMetadata( | |||||
hash=bytes.fromhex(txblockhash)[::-1], | hash=bytes.fromhex(txblockhash)[::-1], | ||||
height=102, | height=102, | ||||
timestamp=1333333355, | timestamp=1333333355, | ||||
)) | ) | ||||
) | |||||
assert_equal(chronik.tx(txid).ok(), proto_tx) | assert_equal(chronik.tx(txid).ok(), proto_tx) | ||||
node.setmocktime(1333333338) | node.setmocktime(1333333338) | ||||
tx2 = CTransaction() | tx2 = CTransaction() | ||||
tx2.nVersion = 2 | tx2.nVersion = 2 | ||||
tx2.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), i), | tx2.vin = [ | ||||
CTxIn( | |||||
outpoint=COutPoint(int(txid, 16), i), | |||||
scriptSig=CScript([redeem_script]), | scriptSig=CScript([redeem_script]), | ||||
nSequence=0xfffffff0 + i) | nSequence=0xFFFFFFF0 + i, | ||||
for i, redeem_script in enumerate(send_redeem_scripts)] | ) | ||||
for i, redeem_script in enumerate(send_redeem_scripts) | |||||
] | |||||
tx2.vout = [CTxOut(coinvalue - 20000, send_scripts[0])] | tx2.vout = [CTxOut(coinvalue - 20000, send_scripts[0])] | ||||
tx2.nLockTime = 12 | tx2.nLockTime = 12 | ||||
# Submit tx to mempool | # Submit tx to mempool | ||||
txid2 = node.sendrawtransaction(tx2.serialize().hex()) | txid2 = node.sendrawtransaction(tx2.serialize().hex()) | ||||
proto_tx2 = pb.Tx( | proto_tx2 = pb.Tx( | ||||
txid=bytes.fromhex(txid2)[::-1], | txid=bytes.fromhex(txid2)[::-1], | ||||
version=tx2.nVersion, | version=tx2.nVersion, | ||||
inputs=[ | inputs=[ | ||||
pb.TxInput( | pb.TxInput( | ||||
prev_out=pb.OutPoint(txid=bytes.fromhex(txid)[::-1], out_idx=i), | prev_out=pb.OutPoint(txid=bytes.fromhex(txid)[::-1], out_idx=i), | ||||
input_script=bytes(tx2.vin[i].scriptSig), | input_script=bytes(tx2.vin[i].scriptSig), | ||||
output_script=bytes(script), | output_script=bytes(script), | ||||
value=value, | value=value, | ||||
sequence_no=0xfffffff0 + i, | sequence_no=0xFFFFFFF0 + i, | ||||
) | ) | ||||
for i, (value, script) in enumerate(zip(send_values, send_scripts)) | for i, (value, script) in enumerate(zip(send_values, send_scripts)) | ||||
], | ], | ||||
outputs=[pb.TxOutput( | outputs=[ | ||||
pb.TxOutput( | |||||
value=tx2.vout[0].nValue, | value=tx2.vout[0].nValue, | ||||
output_script=bytes(tx2.vout[0].scriptPubKey), | output_script=bytes(tx2.vout[0].scriptPubKey), | ||||
)], | ) | ||||
], | |||||
lock_time=12, | lock_time=12, | ||||
block=None, | block=None, | ||||
time_first_seen=1333333338, | time_first_seen=1333333338, | ||||
size=len(tx2.serialize()), | size=len(tx2.serialize()), | ||||
is_coinbase=False, | is_coinbase=False, | ||||
) | ) | ||||
assert_equal(chronik.tx(txid2).ok(), proto_tx2) | assert_equal(chronik.tx(txid2).ok(), proto_tx2) | ||||
# Mine tx | # Mine tx | ||||
tx2blockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | tx2blockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | ||||
# Invalidate block | # Invalidate block | ||||
node.invalidateblock(tx2blockhash) | node.invalidateblock(tx2blockhash) | ||||
# Tx back in mempool | # Tx back in mempool | ||||
assert_equal(chronik.tx(txid2).ok(), proto_tx2) | assert_equal(chronik.tx(txid2).ok(), proto_tx2) | ||||
# Mine conflicting tx | # Mine conflicting tx | ||||
conflict_tx = CTransaction(tx2) | conflict_tx = CTransaction(tx2) | ||||
conflict_tx.nLockTime = 13 | conflict_tx.nLockTime = 13 | ||||
block = create_block(int(txblockhash, 16), | block = create_block( | ||||
create_coinbase(103, b'\x03' * 33), | int(txblockhash, 16), create_coinbase(103, b"\x03" * 33), 1333333500 | ||||
1333333500) | ) | ||||
block.vtx += [conflict_tx] | block.vtx += [conflict_tx] | ||||
block.hashMerkleRoot = block.calc_merkle_root() | block.hashMerkleRoot = block.calc_merkle_root() | ||||
block.solve() | block.solve() | ||||
peer.send_blocks_and_test([block], node) | peer.send_blocks_and_test([block], node) | ||||
assert_equal(chronik.tx(txid2).err(404).msg, | assert_equal( | ||||
f'404: Transaction {txid2} not found in the index') | chronik.tx(txid2).err(404).msg, | ||||
f"404: Transaction {txid2} not found in the index", | |||||
) | |||||
proto_tx2.txid = bytes.fromhex(conflict_tx.hash)[::-1] | proto_tx2.txid = bytes.fromhex(conflict_tx.hash)[::-1] | ||||
proto_tx2.lock_time = 13 | proto_tx2.lock_time = 13 | ||||
proto_tx2.time_first_seen = 0 | proto_tx2.time_first_seen = 0 | ||||
proto_tx2.block.CopyFrom(pb.BlockMetadata( | proto_tx2.block.CopyFrom( | ||||
pb.BlockMetadata( | |||||
hash=bytes.fromhex(block.hash)[::-1], | hash=bytes.fromhex(block.hash)[::-1], | ||||
height=103, | height=103, | ||||
timestamp=1333333500, | timestamp=1333333500, | ||||
)) | ) | ||||
) | |||||
assert_equal(chronik.tx(conflict_tx.hash).ok(), proto_tx2) | assert_equal(chronik.tx(conflict_tx.hash).ok(), proto_tx2) | ||||
if __name__ == '__main__': | if __name__ == "__main__": | ||||
ChronikTxTest().main() | ChronikTxTest().main() |