diff --git a/test/functional/rpc_users.py b/test/functional/rpc_users.py --- a/test/functional/rpc_users.py +++ b/test/functional/rpc_users.py @@ -21,6 +21,20 @@ ) +def call_with_auth(node, user, password): + url = urllib.parse.urlparse(node.url) + headers = { + "Authorization": + "Basic " + str_to_b64str('{}:{}'.format(user, password))} + + conn = http.client.HTTPConnection(url.hostname, url.port) + conn.connect() + conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) + resp = conn.getresponse() + conn.close() + return resp + + class HTTPBasicsTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 @@ -59,108 +73,53 @@ # url = urllib.parse.urlparse(self.nodes[0].url) - # Old authpair - authpair = url.username + ':' + url.password - - # New authpair generated via share/rpcauth tool password = "cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM=" - - # Second authpair with different username password2 = "8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI=" - authpairnew = "rt:" + password self.log.info('Correct...') - headers = {"Authorization": "Basic " + str_to_b64str(authpair)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 200) - conn.close() + assert_equal( + 200, call_with_auth(self.nodes[0], url.username, + url.password).status) # Use new authpair to confirm both work self.log.info('Correct...') - headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 200) - conn.close() + assert_equal( + 200, call_with_auth(self.nodes[0], 'rt', password).status) # Wrong login name with rt's password self.log.info('Wrong...') - authpairnew = "rtwrong:" + password - headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 401) - conn.close() + assert_equal( + 401, call_with_auth(self.nodes[0], 'rtwrong', password).status) # Wrong password for rt self.log.info('Wrong...') - authpairnew = "rt:" + password + "wrong" - headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 401) - conn.close() + assert_equal( + 401, call_with_auth(self.nodes[0], 'rt', + password + 'wrong').status) # Correct for rt2 self.log.info('Correct...') - authpairnew = "rt2:" + password2 - headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 200) - conn.close() + assert_equal( + 200, call_with_auth(self.nodes[0], 'rt2', password2).status) # Wrong password for rt2 self.log.info('Wrong...') - authpairnew = "rt2:" + password2 + "wrong" - headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 401) - conn.close() + assert_equal( + 401, call_with_auth(self.nodes[0], 'rt2', + password2 + 'wrong').status) # Correct for randomly generated user self.log.info('Correct...') - authpairnew = self.user + ":" + self.password - headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 200) - conn.close() + assert_equal( + 200, call_with_auth(self.nodes[0], self.user, + self.password).status) # Wrong password for randomly generated user self.log.info('Wrong...') - authpairnew = self.user + ":" + self.password + "Wrong" - headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 401) - conn.close() + assert_equal( + 401, call_with_auth(self.nodes[0], self.user, + self.password + 'Wrong').status) ############################################################### # Check correctness of the rpcuser/rpcpassword config options # @@ -169,39 +128,21 @@ # rpcuser and rpcpassword authpair self.log.info('Correct...') - rpcuserauthpair = "rpcuser💻:rpcpassword🔑" - - headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 200) - conn.close() + assert_equal( + 200, call_with_auth(self.nodes[1], "rpcuser💻", + "rpcpassword🔑").status) # Wrong login name with rpcuser's password - rpcuserauthpair = "rpcuserwrong:rpcpassword" - headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 401) - conn.close() + self.log.info('Wrong...') + assert_equal( + 401, call_with_auth(self.nodes[1], 'rpcuserwrong', + 'rpcpassword').status) # Wrong password for rpcuser self.log.info('Wrong...') - rpcuserauthpair = "rpcuser:rpcpasswordwrong" - headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)} - - conn = http.client.HTTPConnection(url.hostname, url.port) - conn.connect() - conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) - resp = conn.getresponse() - assert_equal(resp.status, 401) - conn.close() + assert_equal( + 401, call_with_auth(self.nodes[1], 'rpcuser', + 'rpcpasswordwrong').status) if __name__ == '__main__':