Changeset View
Changeset View
Standalone View
Standalone View
test/functional/chronik_avalanche.py
Show All 18 Lines | |||||
class ChronikAvalancheTest(BitcoinTestFramework): | class ChronikAvalancheTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.setup_clean_chain = True | self.setup_clean_chain = True | ||||
self.num_nodes = 1 | self.num_nodes = 1 | ||||
self.extra_args = [ | self.extra_args = [ | ||||
[ | [ | ||||
'-avaproofstakeutxodustthreshold=1000000', | "-avaproofstakeutxodustthreshold=1000000", | ||||
'-avaproofstakeutxoconfirmations=1', | "-avaproofstakeutxoconfirmations=1", | ||||
'-avacooldown=0', | "-avacooldown=0", | ||||
'-avaminquorumstake=0', | "-avaminquorumstake=0", | ||||
'-avaminavaproofsnodecount=0', | "-avaminavaproofsnodecount=0", | ||||
'-chronik', | "-chronik", | ||||
'-whitelist=noban@127.0.0.1', | "-whitelist=noban@127.0.0.1", | ||||
], | ], | ||||
] | ] | ||||
self.supports_cli = False | self.supports_cli = False | ||||
self.rpc_timeout = 240 | self.rpc_timeout = 240 | ||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_chronik() | self.skip_if_no_chronik() | ||||
def run_test(self): | def run_test(self): | ||||
from test_framework.chronik.client import ChronikClient | from test_framework.chronik.client import ChronikClient | ||||
node = self.nodes[0] | node = self.nodes[0] | ||||
chronik = ChronikClient('127.0.0.1', node.chronik_port) | chronik = ChronikClient("127.0.0.1", node.chronik_port) | ||||
# Build a fake quorum of nodes. | # Build a fake quorum of nodes. | ||||
def get_quorum(): | def get_quorum(): | ||||
return [get_ava_p2p_interface(self, node) | return [ | ||||
for _ in range(0, QUORUM_NODE_COUNT)] | get_ava_p2p_interface(self, node) for _ in range(0, QUORUM_NODE_COUNT) | ||||
] | |||||
def has_finalized_tip(tip_expected): | def has_finalized_tip(tip_expected): | ||||
hash_tip_final = int(tip_expected, 16) | hash_tip_final = int(tip_expected, 16) | ||||
can_find_inv_in_poll(quorum, hash_tip_final) | can_find_inv_in_poll(quorum, hash_tip_final) | ||||
return node.isfinalblock(tip_expected) | return node.isfinalblock(tip_expected) | ||||
# Generate us a coin | # Generate us a coin | ||||
coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] | coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] | ||||
coinblock = node.getblock(coinblockhash) | coinblock = node.getblock(coinblockhash) | ||||
cointx = coinblock['tx'][0] | cointx = coinblock["tx"][0] | ||||
# Mature coin | # Mature coin | ||||
self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) | self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) | ||||
# Pick one node from the quorum for polling. | # Pick one node from the quorum for polling. | ||||
quorum = get_quorum() | quorum = get_quorum() | ||||
assert node.getavalancheinfo()['ready_to_poll'] is True | assert node.getavalancheinfo()["ready_to_poll"] is True | ||||
# Build tx to finalize in a block | # Build tx to finalize in a block | ||||
coinvalue = 5000000000 | coinvalue = 5000000000 | ||||
tx = CTransaction() | tx = CTransaction() | ||||
tx.nVersion = 2 | tx.nVersion = 2 | ||||
tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), | tx.vin = [ | ||||
CTxIn( | |||||
outpoint=COutPoint(int(cointx, 16), 0), | |||||
scriptSig=SCRIPTSIG_OP_TRUE, | scriptSig=SCRIPTSIG_OP_TRUE, | ||||
nSequence=0xffffffff)] | nSequence=0xFFFFFFFF, | ||||
tx.vout = [CTxOut(nValue=coinvalue - 10000, | ) | ||||
scriptPubKey=CScript([OP_RETURN, bytes(100)]))] | ] | ||||
tx.vout = [ | |||||
CTxOut( | |||||
nValue=coinvalue - 10000, scriptPubKey=CScript([OP_RETURN, bytes(100)]) | |||||
) | |||||
] | |||||
# Add to mempool | # Add to mempool | ||||
txid = node.sendrawtransaction(tx.serialize().hex()) | txid = node.sendrawtransaction(tx.serialize().hex()) | ||||
# Tx not finalized | # Tx not finalized | ||||
assert_equal(chronik.tx(txid).ok().block.is_final, False) | assert_equal(chronik.tx(txid).ok().block.is_final, False) | ||||
# Mine block | # Mine block | ||||
tip = self.generate(node, 1)[-1] | tip = self.generate(node, 1)[-1] | ||||
# Not finalized yet | # Not finalized yet | ||||
assert_equal(chronik.block(tip).ok().block_info.is_final, False) | assert_equal(chronik.block(tip).ok().block_info.is_final, False) | ||||
assert_equal(chronik.tx(txid).ok().block.is_final, False) | assert_equal(chronik.tx(txid).ok().block.is_final, False) | ||||
# After we wait, both block and tx are finalized | # After we wait, both block and tx are finalized | ||||
self.wait_until(lambda: has_finalized_tip(tip)) | self.wait_until(lambda: has_finalized_tip(tip)) | ||||
assert_equal(chronik.block(tip).ok().block_info.is_final, True) | assert_equal(chronik.block(tip).ok().block_info.is_final, True) | ||||
assert_equal(chronik.tx(txid).ok().block.is_final, True) | assert_equal(chronik.tx(txid).ok().block.is_final, True) | ||||
# Restarting "wipes" the finalization status of blocks... | # Restarting "wipes" the finalization status of blocks... | ||||
self.restart_node(0, self.extra_args[0] + ['-chronikreindex']) | self.restart_node(0, self.extra_args[0] + ["-chronikreindex"]) | ||||
assert_equal(chronik.block(tip).ok().block_info.is_final, False) | assert_equal(chronik.block(tip).ok().block_info.is_final, False) | ||||
assert_equal(chronik.tx(txid).ok().block.is_final, False) | assert_equal(chronik.tx(txid).ok().block.is_final, False) | ||||
# ...so we establish a new quorum and poll again | # ...so we establish a new quorum and poll again | ||||
quorum = get_quorum() | quorum = get_quorum() | ||||
self.wait_until(lambda: has_finalized_tip(tip)) | self.wait_until(lambda: has_finalized_tip(tip)) | ||||
assert_equal(chronik.block(tip).ok().block_info.is_final, True) | assert_equal(chronik.block(tip).ok().block_info.is_final, True) | ||||
assert_equal(chronik.tx(txid).ok().block.is_final, True) | assert_equal(chronik.tx(txid).ok().block.is_final, True) | ||||
Show All 16 Lines | def run_test(self): | ||||
# Have to mine another block until avalanche considers reconsidering | # Have to mine another block until avalanche considers reconsidering | ||||
self.generate(node, 1) | self.generate(node, 1) | ||||
self.wait_until(lambda: has_finalized_tip(new_block_hashes[-1])) | self.wait_until(lambda: has_finalized_tip(new_block_hashes[-1])) | ||||
for block_hash in new_block_hashes: | for block_hash in new_block_hashes: | ||||
assert_equal(chronik.block(block_hash).ok().block_info.is_final, True) | assert_equal(chronik.block(block_hash).ok().block_info.is_final, True) | ||||
if __name__ == '__main__': | if __name__ == "__main__": | ||||
ChronikAvalancheTest().main() | ChronikAvalancheTest().main() |