Changeset View
Changeset View
Standalone View
Standalone View
test/functional/rpc_bind.py
Show All 20 Lines | def setup_network(self): | ||||
self.add_nodes(self.num_nodes, None) | self.add_nodes(self.num_nodes, None) | ||||
def run_bind_test(self, allow_ips, connect_to, addresses, expected): | def run_bind_test(self, allow_ips, connect_to, addresses, expected): | ||||
''' | ''' | ||||
Start a node with requested rpcallowip and rpcbind parameters, | Start a node with requested rpcallowip and rpcbind parameters, | ||||
then try to connect, and check if the set of bound addresses | then try to connect, and check if the set of bound addresses | ||||
matches the expected set. | matches the expected set. | ||||
''' | ''' | ||||
self.log.info("Bind test for %s" % str(addresses)) | self.log.info("Bind test for {}".format(str(addresses))) | ||||
expected = [(addr_to_hex(addr), port) for (addr, port) in expected] | expected = [(addr_to_hex(addr), port) for (addr, port) in expected] | ||||
base_args = ['-disablewallet', '-nolisten'] | base_args = ['-disablewallet', '-nolisten'] | ||||
if allow_ips: | if allow_ips: | ||||
base_args += ['-rpcallowip=' + x for x in allow_ips] | base_args += ['-rpcallowip=' + x for x in allow_ips] | ||||
binds = ['-rpcbind=' + addr for addr in addresses] | binds = ['-rpcbind=' + addr for addr in addresses] | ||||
parts = connect_to.split(':') | parts = connect_to.split(':') | ||||
if len(parts) == 2: | if len(parts) == 2: | ||||
self.nodes[0].host = parts[0] | self.nodes[0].host = parts[0] | ||||
self.nodes[0].rpc_port = parts[1] | self.nodes[0].rpc_port = parts[1] | ||||
else: | else: | ||||
self.nodes[0].host = connect_to | self.nodes[0].host = connect_to | ||||
self.nodes[0].rpc_port = rpc_port(self.nodes[0].index) | self.nodes[0].rpc_port = rpc_port(self.nodes[0].index) | ||||
self.start_node(0, base_args + binds) | self.start_node(0, base_args + binds) | ||||
pid = self.nodes[0].process.pid | pid = self.nodes[0].process.pid | ||||
assert_equal(set(get_bind_addrs(pid)), set(expected)) | assert_equal(set(get_bind_addrs(pid)), set(expected)) | ||||
self.stop_nodes() | self.stop_nodes() | ||||
def run_allowip_test(self, allow_ips, rpchost, rpcport): | def run_allowip_test(self, allow_ips, rpchost, rpcport): | ||||
''' | ''' | ||||
Start a node with rpcallow IP, and request getnetworkinfo | Start a node with rpcallow IP, and request getnetworkinfo | ||||
at a non-localhost IP. | at a non-localhost IP. | ||||
''' | ''' | ||||
self.log.info("Allow IP test for %s:%d" % (rpchost, rpcport)) | self.log.info("Allow IP test for {}:{}".format(rpchost, rpcport)) | ||||
base_args = ['-disablewallet', '-nolisten'] + \ | base_args = ['-disablewallet', '-nolisten'] + \ | ||||
['-rpcallowip=' + x for x in allow_ips] | ['-rpcallowip=' + x for x in allow_ips] | ||||
self.nodes[0].host = None | self.nodes[0].host = None | ||||
self.start_nodes([base_args]) | self.start_nodes([base_args]) | ||||
# connect to node through non-loopback interface | # connect to node through non-loopback interface | ||||
url = rpc_url(get_datadir_path(self.options.tmpdir, 0), | url = rpc_url(get_datadir_path(self.options.tmpdir, 0), | ||||
rpchost, rpcport) | rpchost, rpcport) | ||||
node = get_rpc_proxy(url, 0, coveragedir=self.options.coveragedir) | node = get_rpc_proxy(url, 0, coveragedir=self.options.coveragedir) | ||||
Show All 24 Lines | def run_test(self): | ||||
"This test requires at least one non-loopback IPv4 interface.") | "This test requires at least one non-loopback IPv4 interface.") | ||||
try: | try: | ||||
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) | s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) | ||||
s.connect(("::1", 1)) | s.connect(("::1", 1)) | ||||
s.close | s.close | ||||
except OSError: | except OSError: | ||||
raise SkipTest("This test requires IPv6 support.") | raise SkipTest("This test requires IPv6 support.") | ||||
self.log.info("Using interface %s for testing" % non_loopback_ip) | self.log.info("Using interface {} for testing".format(non_loopback_ip)) | ||||
defaultport = rpc_port(0) | defaultport = rpc_port(0) | ||||
# check default without rpcallowip (IPv4 and IPv6 localhost) | # check default without rpcallowip (IPv4 and IPv6 localhost) | ||||
self.run_bind_test(None, '127.0.0.1', [], | self.run_bind_test(None, '127.0.0.1', [], | ||||
[('127.0.0.1', defaultport), ('::1', defaultport)]) | [('127.0.0.1', defaultport), ('::1', defaultport)]) | ||||
# check default with rpcallowip (IPv6 any) | # check default with rpcallowip (IPv6 any) | ||||
self.run_bind_test(['127.0.0.1'], '127.0.0.1', [], | self.run_bind_test(['127.0.0.1'], '127.0.0.1', [], | ||||
Show All 33 Lines |