Changeset View
Changeset View
Standalone View
Standalone View
test/functional/rpc_users.py
Show All 15 Lines | |||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
from test_framework.util import ( | from test_framework.util import ( | ||||
assert_equal, | assert_equal, | ||||
get_datadir_path, | get_datadir_path, | ||||
str_to_b64str, | str_to_b64str, | ||||
) | ) | ||||
def call_with_auth(node, user, password): | |||||
url = urllib.parse.urlparse(node.url) | |||||
headers = { | |||||
"Authorization": | |||||
"Basic " + str_to_b64str('{}:{}'.format(user, password))} | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
conn.close() | |||||
return resp | |||||
class HTTPBasicsTest(BitcoinTestFramework): | class HTTPBasicsTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.num_nodes = 2 | self.num_nodes = 2 | ||||
def setup_chain(self): | def setup_chain(self): | ||||
super().setup_chain() | super().setup_chain() | ||||
# Append rpcauth to bitcoin.conf before initialization | # Append rpcauth to bitcoin.conf before initialization | ||||
rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144" | rpcauth = "rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144" | ||||
Show All 22 Lines | class HTTPBasicsTest(BitcoinTestFramework): | ||||
def run_test(self): | def run_test(self): | ||||
# | # | ||||
# Check correctness of the rpcauth config option # | # Check correctness of the rpcauth config option # | ||||
# | # | ||||
url = urllib.parse.urlparse(self.nodes[0].url) | url = urllib.parse.urlparse(self.nodes[0].url) | ||||
# Old authpair | |||||
authpair = url.username + ':' + url.password | |||||
# New authpair generated via share/rpcauth tool | |||||
password = "cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM=" | password = "cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM=" | ||||
# Second authpair with different username | |||||
password2 = "8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI=" | password2 = "8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI=" | ||||
authpairnew = "rt:" + password | |||||
self.log.info('Correct...') | self.log.info('Correct...') | ||||
headers = {"Authorization": "Basic " + str_to_b64str(authpair)} | assert_equal( | ||||
200, call_with_auth(self.nodes[0], url.username, | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | url.password).status) | ||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 200) | |||||
conn.close() | |||||
# Use new authpair to confirm both work | # Use new authpair to confirm both work | ||||
self.log.info('Correct...') | self.log.info('Correct...') | ||||
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} | assert_equal( | ||||
200, call_with_auth(self.nodes[0], 'rt', password).status) | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 200) | |||||
conn.close() | |||||
# Wrong login name with rt's password | # Wrong login name with rt's password | ||||
self.log.info('Wrong...') | self.log.info('Wrong...') | ||||
authpairnew = "rtwrong:" + password | |||||
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | assert_equal( | ||||
conn.connect() | 401, call_with_auth(self.nodes[0], 'rtwrong', password).status) | ||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 401) | |||||
conn.close() | |||||
# Wrong password for rt | # Wrong password for rt | ||||
self.log.info('Wrong...') | self.log.info('Wrong...') | ||||
authpairnew = "rt:" + password + "wrong" | assert_equal( | ||||
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} | 401, call_with_auth(self.nodes[0], 'rt', | ||||
password + 'wrong').status) | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 401) | |||||
conn.close() | |||||
# Correct for rt2 | # Correct for rt2 | ||||
self.log.info('Correct...') | self.log.info('Correct...') | ||||
authpairnew = "rt2:" + password2 | assert_equal( | ||||
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} | 200, call_with_auth(self.nodes[0], 'rt2', password2).status) | ||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 200) | |||||
conn.close() | |||||
# Wrong password for rt2 | # Wrong password for rt2 | ||||
self.log.info('Wrong...') | self.log.info('Wrong...') | ||||
authpairnew = "rt2:" + password2 + "wrong" | assert_equal( | ||||
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} | 401, call_with_auth(self.nodes[0], 'rt2', | ||||
password2 + 'wrong').status) | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 401) | |||||
conn.close() | |||||
# Correct for randomly generated user | # Correct for randomly generated user | ||||
self.log.info('Correct...') | self.log.info('Correct...') | ||||
authpairnew = self.user + ":" + self.password | assert_equal( | ||||
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} | 200, call_with_auth(self.nodes[0], self.user, | ||||
self.password).status) | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 200) | |||||
conn.close() | |||||
# Wrong password for randomly generated user | # Wrong password for randomly generated user | ||||
self.log.info('Wrong...') | self.log.info('Wrong...') | ||||
authpairnew = self.user + ":" + self.password + "Wrong" | assert_equal( | ||||
headers = {"Authorization": "Basic " + str_to_b64str(authpairnew)} | 401, call_with_auth(self.nodes[0], self.user, | ||||
self.password + 'Wrong').status) | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 401) | |||||
conn.close() | |||||
############################################################### | ############################################################### | ||||
# Check correctness of the rpcuser/rpcpassword config options # | # Check correctness of the rpcuser/rpcpassword config options # | ||||
############################################################### | ############################################################### | ||||
url = urllib.parse.urlparse(self.nodes[1].url) | url = urllib.parse.urlparse(self.nodes[1].url) | ||||
# rpcuser and rpcpassword authpair | # rpcuser and rpcpassword authpair | ||||
self.log.info('Correct...') | self.log.info('Correct...') | ||||
rpcuserauthpair = "rpcuser💻:rpcpassword🔑" | assert_equal( | ||||
200, call_with_auth(self.nodes[1], "rpcuserđź’»", | |||||
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)} | "rpcpassword🔑").status) | ||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 200) | |||||
conn.close() | |||||
# Wrong login name with rpcuser's password | # Wrong login name with rpcuser's password | ||||
rpcuserauthpair = "rpcuserwrong:rpcpassword" | self.log.info('Wrong...') | ||||
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)} | assert_equal( | ||||
401, call_with_auth(self.nodes[1], 'rpcuserwrong', | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | 'rpcpassword').status) | ||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 401) | |||||
conn.close() | |||||
# Wrong password for rpcuser | # Wrong password for rpcuser | ||||
self.log.info('Wrong...') | self.log.info('Wrong...') | ||||
rpcuserauthpair = "rpcuser:rpcpasswordwrong" | assert_equal( | ||||
headers = {"Authorization": "Basic " + str_to_b64str(rpcuserauthpair)} | 401, call_with_auth(self.nodes[1], 'rpcuser', | ||||
'rpcpasswordwrong').status) | |||||
conn = http.client.HTTPConnection(url.hostname, url.port) | |||||
conn.connect() | |||||
conn.request('POST', '/', '{"method": "getbestblockhash"}', headers) | |||||
resp = conn.getresponse() | |||||
assert_equal(resp.status, 401) | |||||
conn.close() | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
HTTPBasicsTest().main() | HTTPBasicsTest().main() |