Changeset View
Changeset View
Standalone View
Standalone View
test/functional/wallet_import_rescan.py
Show All 33 Lines | |||||
Call = enum.Enum("Call", "single multiaddress multiscript") | Call = enum.Enum("Call", "single multiaddress multiscript") | ||||
Data = enum.Enum("Data", "address pub priv") | Data = enum.Enum("Data", "address pub priv") | ||||
Rescan = enum.Enum("Rescan", "no yes late_timestamp") | Rescan = enum.Enum("Rescan", "no yes late_timestamp") | ||||
class Variant(collections.namedtuple("Variant", "call data rescan prune")): | class Variant(collections.namedtuple("Variant", "call data rescan prune")): | ||||
"""Helper for importing one key and verifying scanned transactions.""" | """Helper for importing one key and verifying scanned transactions.""" | ||||
def do_import(self, timestamp): | def do_import(self, timestamp): | ||||
"""Call one key import RPC.""" | """Call one key import RPC.""" | ||||
rescan = self.rescan == Rescan.yes | rescan = self.rescan == Rescan.yes | ||||
assert_equal(self.address["solvable"], True) | |||||
if self.call == Call.single: | if self.call == Call.single: | ||||
if self.data == Data.address: | if self.data == Data.address: | ||||
response = self.node.importaddress( | response = self.node.importaddress( | ||||
address=self.address["address"], label=self.label, rescan=rescan) | address=self.address["address"], label=self.label, rescan=rescan) | ||||
elif self.data == Data.pub: | elif self.data == Data.pub: | ||||
response = self.node.importpubkey( | response = self.node.importpubkey( | ||||
pubkey=self.address["pubkey"], label=self.label, rescan=rescan) | pubkey=self.address["pubkey"], label=self.label, rescan=rescan) | ||||
elif self.data == Data.priv: | elif self.data == Data.priv: | ||||
response = self.node.importprivkey( | response = self.node.importprivkey( | ||||
privkey=self.key, label=self.label, rescan=rescan) | privkey=self.key, label=self.label, rescan=rescan) | ||||
assert_equal(response, None) | assert_equal(response, None) | ||||
elif self.call in (Call.multiaddress, Call.multiscript): | elif self.call in (Call.multiaddress, Call.multiscript): | ||||
response = self.node.importmulti([{ | request = { | ||||
"scriptPubKey": { | "scriptPubKey": { | ||||
"address": self.address["address"] | "address": self.address["address"] | ||||
} if self.call == Call.multiaddress else self.address["scriptPubKey"], | } if self.call == Call.multiaddress else self.address["scriptPubKey"], | ||||
"timestamp": timestamp + TIMESTAMP_WINDOW + (1 if self.rescan == Rescan.late_timestamp else 0), | "timestamp": timestamp + TIMESTAMP_WINDOW + ( | ||||
1 if self.rescan == Rescan.late_timestamp else 0), | |||||
"pubkeys": [self.address["pubkey"]] if self.data == Data.pub else [], | "pubkeys": [self.address["pubkey"]] if self.data == Data.pub else [], | ||||
"keys": [self.key] if self.data == Data.priv else [], | "keys": [self.key] if self.data == Data.priv else [], | ||||
"label": self.label, | "label": self.label, | ||||
"watchonly": self.data != Data.priv | "watchonly": self.data != Data.priv | ||||
}], {"rescan": self.rescan in (Rescan.yes, Rescan.late_timestamp)}) | } | ||||
response = self.node.importmulti( | |||||
requests=[request], | |||||
options={ | |||||
"rescan": self.rescan in ( | |||||
Rescan.yes, | |||||
Rescan.late_timestamp)}, | |||||
) | |||||
assert_equal(response, [{"success": True}]) | assert_equal(response, [{"success": True}]) | ||||
def check(self, txid=None, amount=None, confirmation_height=None): | def check(self, txid=None, amount=None, confirmation_height=None): | ||||
"""Verify that listtransactions/listreceivedbyaddress return expected values.""" | """Verify that listtransactions/listreceivedbyaddress return expected values.""" | ||||
txs = self.node.listtransactions( | txs = self.node.listtransactions( | ||||
label=self.label, count=10000, include_watchonly=True) | label=self.label, count=10000, include_watchonly=True) | ||||
current_height = self.node.getblockcount() | current_height = self.node.getblockcount() | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | |||||
class ImportRescanTest(BitcoinTestFramework): | class ImportRescanTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.num_nodes = 2 + len(IMPORT_NODES) | self.num_nodes = 2 + len(IMPORT_NODES) | ||||
def skip_test_if_missing_module(self): | def skip_test_if_missing_module(self): | ||||
self.skip_if_no_wallet() | self.skip_if_no_wallet() | ||||
def setup_network(self): | def setup_network(self): | ||||
extra_args = [[] for _ in range(self.num_nodes)] | self.extra_args = [[] for _ in range(self.num_nodes)] | ||||
for i, import_node in enumerate(IMPORT_NODES, 2): | for i, import_node in enumerate(IMPORT_NODES, 2): | ||||
if import_node.prune: | if import_node.prune: | ||||
extra_args[i] += ["-prune=1"] | self.extra_args[i] += ["-prune=1"] | ||||
self.add_nodes(self.num_nodes, extra_args=extra_args) | self.add_nodes(self.num_nodes, extra_args=self.extra_args) | ||||
# Import keys with pruning disabled | # Import keys with pruning disabled | ||||
self.start_nodes(extra_args=[[]] * self.num_nodes) | self.start_nodes(extra_args=[[]] * self.num_nodes) | ||||
for n in self.nodes: | for n in self.nodes: | ||||
n.importprivkey( | n.importprivkey( | ||||
privkey=n.get_deterministic_priv_key().key, | privkey=n.get_deterministic_priv_key().key, | ||||
label='coinbase') | label='coinbase') | ||||
self.stop_nodes() | self.stop_nodes() | ||||
self.start_nodes() | self.start_nodes() | ||||
for i in range(1, self.num_nodes): | for i in range(1, self.num_nodes): | ||||
connect_nodes(self.nodes[i], self.nodes[0]) | connect_nodes(self.nodes[i], self.nodes[0]) | ||||
def run_test(self): | def run_test(self): | ||||
# Create one transaction on node 0 with a unique amount for | # Create one transaction on node 0 with a unique amount for | ||||
# each possible type of wallet import RPC. | # each possible type of wallet import RPC. | ||||
for i, variant in enumerate(IMPORT_VARIANTS): | for i, variant in enumerate(IMPORT_VARIANTS): | ||||
variant.label = "label {} {}".format(i, variant) | variant.label = "label {} {}".format(i, variant) | ||||
variant.address = self.nodes[1].getaddressinfo( | variant.address = self.nodes[1].getaddressinfo( | ||||
self.nodes[1].getnewaddress(variant.label)) | self.nodes[1].getnewaddress(label=variant.label)) | ||||
variant.key = self.nodes[1].dumpprivkey(variant.address["address"]) | variant.key = self.nodes[1].dumpprivkey(variant.address["address"]) | ||||
variant.initial_amount = get_rand_amount() | variant.initial_amount = get_rand_amount() | ||||
variant.initial_txid = self.nodes[0].sendtoaddress( | variant.initial_txid = self.nodes[0].sendtoaddress( | ||||
variant.address["address"], variant.initial_amount) | variant.address["address"], variant.initial_amount) | ||||
# Generate one block for each send | # Generate one block for each send | ||||
self.nodes[0].generate(1) | self.nodes[0].generate(1) | ||||
variant.confirmation_height = self.nodes[0].getblockcount() | variant.confirmation_height = self.nodes[0].getblockcount() | ||||
variant.timestamp = self.nodes[0].getblockheader( | variant.timestamp = self.nodes[0].getblockheader( | ||||
▲ Show 20 Lines • Show All 50 Lines • Show Last 20 Lines |