Changeset View
Changeset View
Standalone View
Standalone View
test/functional/feature_notifications.py
Show All 35 Lines | def setup_network(self): | ||||
"-walletnotify=echo %s >> {}".format(self.tx_filename)]] | "-walletnotify=echo %s >> {}".format(self.tx_filename)]] | ||||
super().setup_network() | super().setup_network() | ||||
def run_test(self): | def run_test(self): | ||||
self.log.info("test -blocknotify") | self.log.info("test -blocknotify") | ||||
block_count = 10 | block_count = 10 | ||||
blocks = self.nodes[1].generate(block_count) | blocks = self.nodes[1].generate(block_count) | ||||
# wait at most 10 seconds for expected file size before reading the content | # wait at most 10 seconds for expected file size before reading the | ||||
# content | |||||
wait_until(lambda: os.path.isfile(self.block_filename) and os.stat( | wait_until(lambda: os.path.isfile(self.block_filename) and os.stat( | ||||
self.block_filename).st_size >= (block_count * 65), timeout=10) | self.block_filename).st_size >= (block_count * 65), timeout=10) | ||||
# file content should equal the generated blocks hashes | # file content should equal the generated blocks hashes | ||||
with open(self.block_filename, 'r', encoding="utf-8") as f: | with open(self.block_filename, 'r', encoding="utf-8") as f: | ||||
assert_equal(sorted(blocks), sorted(l.strip() | assert_equal(sorted(blocks), sorted(l.strip() | ||||
for l in f.read().splitlines())) | for l in f.read().splitlines())) | ||||
self.log.info("test -walletnotify") | self.log.info("test -walletnotify") | ||||
# wait at most 10 seconds for expected file size before reading the content | # wait at most 10 seconds for expected file size before reading the | ||||
# content | |||||
wait_until(lambda: os.path.isfile(self.tx_filename) and os.stat( | wait_until(lambda: os.path.isfile(self.tx_filename) and os.stat( | ||||
self.tx_filename).st_size >= (block_count * 65), timeout=10) | self.tx_filename).st_size >= (block_count * 65), timeout=10) | ||||
# file content should equal the generated transaction hashes | # file content should equal the generated transaction hashes | ||||
txids_rpc = list( | txids_rpc = list( | ||||
map(lambda t: t['txid'], self.nodes[1].listtransactions("*", block_count))) | map(lambda t: t['txid'], self.nodes[1].listtransactions("*", block_count))) | ||||
with open(self.tx_filename, 'r', encoding="ascii") as f: | with open(self.tx_filename, 'r', encoding="ascii") as f: | ||||
assert_equal(sorted(txids_rpc), sorted(l.strip() | assert_equal(sorted(txids_rpc), sorted(l.strip() | ||||
Show All 21 Lines | def run_test(self): | ||||
self.nodes[0].generate(1) | self.nodes[0].generate(1) | ||||
invalid_block = self.nodes[0].getbestblockhash() | invalid_block = self.nodes[0].getbestblockhash() | ||||
self.nodes[0].generate(7) | self.nodes[0].generate(7) | ||||
# Invalidate a large branch, which should trigger an alert. | # Invalidate a large branch, which should trigger an alert. | ||||
self.nodes[0].invalidateblock(invalid_block) | self.nodes[0].invalidateblock(invalid_block) | ||||
# Give bitcoind 10 seconds to write the alert notification | # Give bitcoind 10 seconds to write the alert notification | ||||
wait_until(lambda: os.path.isfile(self.alert_filename) | wait_until(lambda: os.path.isfile(self.alert_filename) and | ||||
and os.path.getsize(self.alert_filename), timeout=10) | os.path.getsize(self.alert_filename), timeout=10) | ||||
self.log.info(self.alert_filename) | self.log.info(self.alert_filename) | ||||
with open(self.alert_filename, 'r', encoding='utf8') as f: | with open(self.alert_filename, 'r', encoding='utf8') as f: | ||||
assert_equal(f.read(), (FORK_WARNING_MESSAGE.format(fork_block))) | assert_equal(f.read(), (FORK_WARNING_MESSAGE.format(fork_block))) | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
NotificationsTest().main() | NotificationsTest().main() |