Changeset View
Changeset View
Standalone View
Standalone View
test/functional/chronik_spent_by.py
Show All 22 Lines | |||||
from test_framework.txtools import pad_tx | from test_framework.txtools import pad_tx | ||||
from test_framework.util import assert_equal | from test_framework.util import assert_equal | ||||
class ChronikSpentByTest(BitcoinTestFramework): | class ChronikSpentByTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.setup_clean_chain = True | self.setup_clean_chain = True | ||||
self.num_nodes = 1 | self.num_nodes = 1 | ||||
self.extra_args = [['-chronik']] | self.extra_args = [["-chronik"]] | ||||
self.rpc_timeout = 240 | self.rpc_timeout = 240 | ||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_chronik() | self.skip_if_no_chronik() | ||||
def run_test(self): | def run_test(self): | ||||
from test_framework.chronik.client import ChronikClient, pb | from test_framework.chronik.client import ChronikClient, pb | ||||
node = self.nodes[0] | node = self.nodes[0] | ||||
node.setmocktime(1300000000) | node.setmocktime(1300000000) | ||||
chronik = ChronikClient('127.0.0.1', node.chronik_port) | chronik = ChronikClient("127.0.0.1", node.chronik_port) | ||||
peer = node.add_p2p_connection(P2PDataStore()) | peer = node.add_p2p_connection(P2PDataStore()) | ||||
coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] | coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] | ||||
coinblock = node.getblock(coinblockhash) | coinblock = node.getblock(coinblockhash) | ||||
cointx = coinblock['tx'][0] | cointx = coinblock["tx"][0] | ||||
tip = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] | tip = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] | ||||
coinvalue = 5000000000 | coinvalue = 5000000000 | ||||
send_values = [coinvalue - 10000, 1000, 1000, 1000] | send_values = [coinvalue - 10000, 1000, 1000, 1000] | ||||
send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] | send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] | ||||
send_script_hashes = [hash160(script) for script in send_redeem_scripts] | send_script_hashes = [hash160(script) for script in send_redeem_scripts] | ||||
send_scripts = [CScript([OP_HASH160, script_hash, OP_EQUAL]) | send_scripts = [ | ||||
for script_hash in send_script_hashes] | CScript([OP_HASH160, script_hash, OP_EQUAL]) | ||||
for script_hash in send_script_hashes | |||||
] | |||||
tx = CTransaction() | tx = CTransaction() | ||||
tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), | tx.vin = [ | ||||
scriptSig=SCRIPTSIG_OP_TRUE)] | CTxIn(outpoint=COutPoint(int(cointx, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) | ||||
tx.vout = [CTxOut(value, script) | ] | ||||
for (value, script) in zip(send_values, send_scripts)] | tx.vout = [ | ||||
CTxOut(value, script) for (value, script) in zip(send_values, send_scripts) | |||||
] | |||||
tx.rehash() | tx.rehash() | ||||
# Submit tx to mempool | # Submit tx to mempool | ||||
txid = node.sendrawtransaction(tx.serialize().hex()) | txid = node.sendrawtransaction(tx.serialize().hex()) | ||||
def tx_outputs_spent(tx): | def tx_outputs_spent(tx): | ||||
return [output.spent_by for output in tx.outputs] | return [output.spent_by for output in tx.outputs] | ||||
def find_tx(txs): | def find_tx(txs): | ||||
return [tx for tx in txs if tx.txid[::-1].hex() == txid][0] | return [tx for tx in txs if tx.txid[::-1].hex() == txid][0] | ||||
def check_outputs_spent(expected_outpoints, *, has_been_mined): | def check_outputs_spent(expected_outpoints, *, has_been_mined): | ||||
assert_equal( | assert_equal( | ||||
tx_outputs_spent(chronik.tx(txid).ok()), | tx_outputs_spent(chronik.tx(txid).ok()), | ||||
expected_outpoints, | expected_outpoints, | ||||
) | ) | ||||
for script_hash in send_script_hashes: | for script_hash in send_script_hashes: | ||||
chronik_script = chronik.script('p2sh', script_hash.hex()) | chronik_script = chronik.script("p2sh", script_hash.hex()) | ||||
if has_been_mined: | if has_been_mined: | ||||
txs = chronik_script.confirmed_txs().ok() | txs = chronik_script.confirmed_txs().ok() | ||||
else: | else: | ||||
txs = chronik_script.unconfirmed_txs().ok() | txs = chronik_script.unconfirmed_txs().ok() | ||||
tx = find_tx(txs.txs) | tx = find_tx(txs.txs) | ||||
assert_equal(tx, find_tx(chronik_script.history().ok().txs)) | assert_equal(tx, find_tx(chronik_script.history().ok().txs)) | ||||
assert_equal( | assert_equal( | ||||
tx_outputs_spent(tx), | tx_outputs_spent(tx), | ||||
expected_outpoints, | expected_outpoints, | ||||
) | ) | ||||
# Initially, none of the outputs are spent | # Initially, none of the outputs are spent | ||||
check_outputs_spent([pb.SpentBy()] * len(send_values), has_been_mined=False) | check_outputs_spent([pb.SpentBy()] * len(send_values), has_been_mined=False) | ||||
# Add tx that spends the middle two outputs to mempool | # Add tx that spends the middle two outputs to mempool | ||||
tx2 = CTransaction() | tx2 = CTransaction() | ||||
tx2.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), i + 1), | tx2.vin = [ | ||||
scriptSig=CScript([redeem_script])) | CTxIn( | ||||
for i, redeem_script in enumerate(send_redeem_scripts[1:3])] | outpoint=COutPoint(int(txid, 16), i + 1), | ||||
scriptSig=CScript([redeem_script]), | |||||
) | |||||
for i, redeem_script in enumerate(send_redeem_scripts[1:3]) | |||||
] | |||||
pad_tx(tx2) | pad_tx(tx2) | ||||
txid2 = node.sendrawtransaction(tx2.serialize().hex()) | txid2 = node.sendrawtransaction(tx2.serialize().hex()) | ||||
middle_two_spent = [ | middle_two_spent = [ | ||||
pb.SpentBy(), | pb.SpentBy(), | ||||
pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), | pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), | ||||
pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), | pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), | ||||
pb.SpentBy(), | pb.SpentBy(), | ||||
] | ] | ||||
check_outputs_spent(middle_two_spent, has_been_mined=False) | check_outputs_spent(middle_two_spent, has_been_mined=False) | ||||
# Mining both txs still works | # Mining both txs still works | ||||
block2 = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | block2 = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | ||||
check_outputs_spent(middle_two_spent, has_been_mined=True) | check_outputs_spent(middle_two_spent, has_been_mined=True) | ||||
# Add tx that also spends the last output to the mempool | # Add tx that also spends the last output to the mempool | ||||
tx3 = CTransaction() | tx3 = CTransaction() | ||||
tx3.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 3), | tx3.vin = [ | ||||
scriptSig=CScript([send_redeem_scripts[3]]))] | CTxIn( | ||||
outpoint=COutPoint(int(txid, 16), 3), | |||||
scriptSig=CScript([send_redeem_scripts[3]]), | |||||
) | |||||
] | |||||
pad_tx(tx3) | pad_tx(tx3) | ||||
txid3 = node.sendrawtransaction(tx3.serialize().hex()) | txid3 = node.sendrawtransaction(tx3.serialize().hex()) | ||||
# 2 outputs spent by a mined tx, 1 output spent by a mempool tx | # 2 outputs spent by a mined tx, 1 output spent by a mempool tx | ||||
last_three_spent = [ | last_three_spent = [ | ||||
pb.SpentBy(), | pb.SpentBy(), | ||||
pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), | pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), | ||||
pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), | pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), | ||||
Show All 14 Lines | def run_test(self): | ||||
check_outputs_spent(last_three_spent, has_been_mined=False) | check_outputs_spent(last_three_spent, has_been_mined=False) | ||||
# Mine a tx conflicting with tx3 | # Mine a tx conflicting with tx3 | ||||
tx3_conflict = CTransaction(tx3) | tx3_conflict = CTransaction(tx3) | ||||
tx3_conflict.nLockTime = 1 | tx3_conflict.nLockTime = 1 | ||||
tx3_conflict.rehash() | tx3_conflict.rehash() | ||||
# Block mines tx, tx2 and tx3_conflict | # Block mines tx, tx2 and tx3_conflict | ||||
block = create_block(int(tip, 16), | block = create_block( | ||||
create_coinbase(101, b'\x03' * 33), | int(tip, 16), create_coinbase(101, b"\x03" * 33), 1300000500 | ||||
1300000500) | ) | ||||
block.vtx += [tx, tx2, tx3_conflict] | block.vtx += [tx, tx2, tx3_conflict] | ||||
make_conform_to_ctor(block) | make_conform_to_ctor(block) | ||||
block.hashMerkleRoot = block.calc_merkle_root() | block.hashMerkleRoot = block.calc_merkle_root() | ||||
block.solve() | block.solve() | ||||
peer.send_blocks_and_test([block], node) | peer.send_blocks_and_test([block], node) | ||||
conflict_spent = [ | conflict_spent = [ | ||||
pb.SpentBy(), | pb.SpentBy(), | ||||
pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), | pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), | ||||
pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), | pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), | ||||
pb.SpentBy(txid=bytes.fromhex(tx3_conflict.hash)[::-1], input_idx=0), | pb.SpentBy(txid=bytes.fromhex(tx3_conflict.hash)[::-1], input_idx=0), | ||||
] | ] | ||||
check_outputs_spent(conflict_spent, has_been_mined=True) | check_outputs_spent(conflict_spent, has_been_mined=True) | ||||
if __name__ == '__main__': | if __name__ == "__main__": | ||||
ChronikSpentByTest().main() | ChronikSpentByTest().main() |