Changeset View
Changeset View
Standalone View
Standalone View
contrib/buildbot/teamcity_wrapper.py
Show All 14 Lines | |||||
class TeamcityRequestException(Exception): | class TeamcityRequestException(Exception): | ||||
pass | pass | ||||
class BuildInfo(UserDict): | class BuildInfo(UserDict): | ||||
@staticmethod | @staticmethod | ||||
def fromSingleBuildResponse(json_content): | def fromSingleBuildResponse(json_content): | ||||
return BuildInfo(json_content['build'][0]) | return BuildInfo(json_content["build"][0]) | ||||
def getCommits(self): | def getCommits(self): | ||||
return [change['version'] for change in self.data['changes'] | return ( | ||||
['change']] if 'changes' in (self.data or {}) else None | [change["version"] for change in self.data["changes"]["change"]] | ||||
if "changes" in (self.data or {}) | |||||
else None | |||||
) | |||||
def getProperties(self): | def getProperties(self): | ||||
propsList = [] | propsList = [] | ||||
if 'properties' in (self.data or {}): | if "properties" in (self.data or {}): | ||||
propsList = self.data['properties']['property'] | propsList = self.data["properties"]["property"] | ||||
# Transform list of properties [{'name': name, 'value': value}, ...] into a | # Transform list of properties [{'name': name, 'value': value}, ...] into a | ||||
# dict {name: value, ...} since we only care about the property values. | # dict {name: value, ...} since we only care about the property values. | ||||
properties = {} | properties = {} | ||||
for prop in propsList: | for prop in propsList: | ||||
properties[prop['name']] = prop['value'] | properties[prop["name"]] = prop["value"] | ||||
return properties if properties else None | return properties if properties else None | ||||
class TeamCity(): | class TeamCity: | ||||
def __init__(self, base_url, username, password): | def __init__(self, base_url, username, password): | ||||
self.session = requests.Session() | self.session = requests.Session() | ||||
self.base_url = base_url | self.base_url = base_url | ||||
self.auth = (username, password) | self.auth = (username, password) | ||||
self.logger = None | self.logger = None | ||||
self.mockTime = None | self.mockTime = None | ||||
with open(os.path.join(os.path.dirname(__file__), 'ignore-logs.txt'), 'rb') as ignoreList: | with open( | ||||
os.path.join(os.path.dirname(__file__), "ignore-logs.txt"), "rb" | |||||
) as ignoreList: | |||||
self.ignoreList = ignoreList.readlines() | self.ignoreList = ignoreList.readlines() | ||||
def set_logger(self, logger): | def set_logger(self, logger): | ||||
self.logger = logger | self.logger = logger | ||||
def getTime(self): | def getTime(self): | ||||
if self.mockTime: | if self.mockTime: | ||||
return self.mockTime | return self.mockTime | ||||
Show All 10 Lines | class TeamCity: | ||||
def getResponse(self, request, expectJson=True): | def getResponse(self, request, expectJson=True): | ||||
response = self.session.send(request.prepare()) | response = self.session.send(request.prepare()) | ||||
if response.status_code != requests.codes.ok: | if response.status_code != requests.codes.ok: | ||||
# Log the entire response, because something went wrong | # Log the entire response, because something went wrong | ||||
if self.logger: | if self.logger: | ||||
self.logger.info( | self.logger.info( | ||||
"Request:\n{}\n\nResponse:\n{}".format( | "Request:\n{}\n\nResponse:\n{}".format( | ||||
pprint( | pprint(vars(request)), pprint(vars(response)) | ||||
vars(request)), pprint( | ) | ||||
vars(response)))) | ) | ||||
raise TeamcityRequestException( | raise TeamcityRequestException( | ||||
f"Unexpected Teamcity API error! Status code: {response.status_code}") | f"Unexpected Teamcity API error! Status code: {response.status_code}" | ||||
) | |||||
content = response.content | content = response.content | ||||
if expectJson: | if expectJson: | ||||
content = json.loads(content) | content = json.loads(content) | ||||
# Log the response content to aid in debugging | # Log the response content to aid in debugging | ||||
if self.logger: | if self.logger: | ||||
self.logger.info(content) | self.logger.info(content) | ||||
return content | return content | ||||
def trigger_build(self, buildTypeId, ref, PHID=None, properties=None): | def trigger_build(self, buildTypeId, ref, PHID=None, properties=None): | ||||
endpoint = self.build_url("app/rest/buildQueue") | endpoint = self.build_url("app/rest/buildQueue") | ||||
if not properties: | if not properties: | ||||
properties = [] | properties = [] | ||||
if PHID is not None: | if PHID is not None: | ||||
properties.append({ | properties.append( | ||||
'name': 'env.harborMasterTargetPHID', | { | ||||
'value': PHID, | "name": "env.harborMasterTargetPHID", | ||||
}) | "value": PHID, | ||||
} | |||||
) | |||||
build = { | build = { | ||||
'branchName': ref, | "branchName": ref, | ||||
'buildType': { | "buildType": {"id": buildTypeId}, | ||||
'id': buildTypeId | "properties": { | ||||
"property": properties, | |||||
}, | }, | ||||
'properties': { | |||||
'property': properties, | |||||
} | |||||
} | } | ||||
req = self._request('POST', endpoint, json.dumps(build)) | req = self._request("POST", endpoint, json.dumps(build)) | ||||
return self.getResponse(req) | return self.getResponse(req) | ||||
def get_artifact(self, buildId, path): | def get_artifact(self, buildId, path): | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
f"app/rest/builds/id:{buildId}/artifacts/content/{path}" | f"app/rest/builds/id:{buildId}/artifacts/content/{path}" | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req, expectJson=False) | content = self.getResponse(req, expectJson=False) | ||||
if not content: | if not content: | ||||
return None | return None | ||||
return content.decode('utf-8') | return content.decode("utf-8") | ||||
def get_coverage_summary(self, buildId): | def get_coverage_summary(self, buildId): | ||||
return self.get_artifact( | return self.get_artifact(buildId, "coverage.tar.gz!/coverage-summary.txt") | ||||
buildId, "coverage.tar.gz!/coverage-summary.txt") | |||||
def get_clean_build_log(self, buildId): | def get_clean_build_log(self, buildId): | ||||
return self.get_artifact(buildId, "artifacts.tar.gz!/build.clean.log") | return self.get_artifact(buildId, "artifacts.tar.gz!/build.clean.log") | ||||
def getBuildLog(self, buildId): | def getBuildLog(self, buildId): | ||||
# Try to get the clean build log first, then fallback to the full log | # Try to get the clean build log first, then fallback to the full log | ||||
try: | try: | ||||
clean_log = self.get_clean_build_log(buildId) | clean_log = self.get_clean_build_log(buildId) | ||||
if clean_log: | if clean_log: | ||||
return clean_log | return clean_log | ||||
except TeamcityRequestException: | except TeamcityRequestException: | ||||
# This is likely a 404 and the log doesn't exist. Either way, | # This is likely a 404 and the log doesn't exist. Either way, | ||||
# ignore the failure since there is an alternative log we can | # ignore the failure since there is an alternative log we can | ||||
# fetch. | # fetch. | ||||
pass | pass | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"downloadBuildLog.html", | "downloadBuildLog.html", | ||||
{ | { | ||||
"buildId": buildId, | "buildId": buildId, | ||||
"archived": "true", | "archived": "true", | ||||
} | }, | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req, expectJson=False) | content = self.getResponse(req, expectJson=False) | ||||
ret = "" | ret = "" | ||||
if not content: | if not content: | ||||
ret = "[Error Fetching Build Log]" | ret = "[Error Fetching Build Log]" | ||||
else: | else: | ||||
z = ZipFile(io.BytesIO(content)) | z = ZipFile(io.BytesIO(content)) | ||||
for filename in z.namelist(): | for filename in z.namelist(): | ||||
for line in z.open(filename).readlines(): | for line in z.open(filename).readlines(): | ||||
ret += line.decode('utf-8') | ret += line.decode("utf-8") | ||||
return ret.replace('\r\n', '\n') | return ret.replace("\r\n", "\n") | ||||
def getPreviewUrl(self, buildId): | def getPreviewUrl(self, buildId): | ||||
try: | try: | ||||
return self.get_artifact(buildId, "artifacts.tar.gz!/preview_url.log") | return self.get_artifact(buildId, "artifacts.tar.gz!/preview_url.log") | ||||
except TeamcityRequestException: | except TeamcityRequestException: | ||||
# This is likely a 404 and the log doesn't exist. | # This is likely a 404 and the log doesn't exist. | ||||
pass | pass | ||||
return None | return None | ||||
def getBuildProblems(self, buildId): | def getBuildProblems(self, buildId): | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"app/rest/problemOccurrences", | "app/rest/problemOccurrences", | ||||
{ | { | ||||
"locator": f"build:(id:{buildId})", | "locator": f"build:(id:{buildId})", | ||||
"fields": "problemOccurrence(id,details)", | "fields": "problemOccurrence(id,details)", | ||||
} | }, | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req) | content = self.getResponse(req) | ||||
if 'problemOccurrence' in (content or {}): | if "problemOccurrence" in (content or {}): | ||||
buildFailures = content['problemOccurrence'] | buildFailures = content["problemOccurrence"] | ||||
for failure in buildFailures: | for failure in buildFailures: | ||||
# Note: Unlike test failures, build "problems" do not have | # Note: Unlike test failures, build "problems" do not have | ||||
# a well-defined focus line in the build log. For now, we | # a well-defined focus line in the build log. For now, we | ||||
# link to the footer to automatically scroll to the bottom | # link to the footer to automatically scroll to the bottom | ||||
# of the log where failures tend to be. | # of the log where failures tend to be. | ||||
failure['logUrl'] = self.build_url( | failure["logUrl"] = self.build_url( | ||||
"viewLog.html", | "viewLog.html", | ||||
{ | { | ||||
"tab": "buildLog", | "tab": "buildLog", | ||||
"logTab": "tree", | "logTab": "tree", | ||||
"filter": "debug", | "filter": "debug", | ||||
"expand": "all", | "expand": "all", | ||||
"buildId": buildId, | "buildId": buildId, | ||||
}, | }, | ||||
"footer" | "footer", | ||||
) | ) | ||||
return buildFailures | return buildFailures | ||||
return [] | return [] | ||||
def getFailedTests(self, buildId): | def getFailedTests(self, buildId): | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"app/rest/testOccurrences", | "app/rest/testOccurrences", | ||||
{ | { | ||||
"locator": f"build:(id:{buildId}),status:FAILURE", | "locator": f"build:(id:{buildId}),status:FAILURE", | ||||
"fields": "testOccurrence(id,details,name)", | "fields": "testOccurrence(id,details,name)", | ||||
} | }, | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req) | content = self.getResponse(req) | ||||
if 'testOccurrence' in (content or {}): | if "testOccurrence" in (content or {}): | ||||
testFailures = content['testOccurrence'] | testFailures = content["testOccurrence"] | ||||
for failure in testFailures: | for failure in testFailures: | ||||
params = { | params = { | ||||
"tab": "buildLog", | "tab": "buildLog", | ||||
"logTab": "tree", | "logTab": "tree", | ||||
"filter": "debug", | "filter": "debug", | ||||
"expand": "all", | "expand": "all", | ||||
"buildId": buildId, | "buildId": buildId, | ||||
} | } | ||||
match = re.search(r'id:(\d+)', failure['id']) | match = re.search(r"id:(\d+)", failure["id"]) | ||||
if match: | if match: | ||||
params['_focus'] = match.group(1) | params["_focus"] = match.group(1) | ||||
failure['logUrl'] = self.build_url( | failure["logUrl"] = self.build_url("viewLog.html", params) | ||||
"viewLog.html", | |||||
params | |||||
) | |||||
return testFailures | return testFailures | ||||
return [] | return [] | ||||
def getBuildChangeDetails(self, changeId): | def getBuildChangeDetails(self, changeId): | ||||
endpoint = self.build_url(f"app/rest/changes/{changeId}") | endpoint = self.build_url(f"app/rest/changes/{changeId}") | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
return self.getResponse(req) or {} | return self.getResponse(req) or {} | ||||
def getBuildChanges(self, buildId): | def getBuildChanges(self, buildId): | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"app/rest/changes", | "app/rest/changes", | ||||
{ | {"locator": f"build:(id:{buildId})", "fields": "change(id)"}, | ||||
"locator": f"build:(id:{buildId})", | |||||
"fields": "change(id)" | |||||
} | |||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req) | content = self.getResponse(req) | ||||
if 'change' in (content or {}): | if "change" in (content or {}): | ||||
changes = content['change'] | changes = content["change"] | ||||
for i, change in enumerate(changes): | for i, change in enumerate(changes): | ||||
changes[i] = self.getBuildChangeDetails(change['id']) | changes[i] = self.getBuildChangeDetails(change["id"]) | ||||
return changes | return changes | ||||
return [] | return [] | ||||
def getBuildInfo(self, buildId): | def getBuildInfo(self, buildId): | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"app/rest/builds", | "app/rest/builds", | ||||
{ | { | ||||
"locator": f"id:{buildId}", | "locator": f"id:{buildId}", | ||||
# Note: Wildcard does not match recursively, so if you need data | # Note: Wildcard does not match recursively, so if you need data | ||||
# from a sub-field, be sure to include it in the list. | # from a sub-field, be sure to include it in the list. | ||||
"fields": "build(*,changes(*),properties(*),triggered(*))", | "fields": "build(*,changes(*),properties(*),triggered(*))", | ||||
} | }, | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req) | content = self.getResponse(req) | ||||
if 'build' in (content or {}): | if "build" in (content or {}): | ||||
return BuildInfo.fromSingleBuildResponse(content) | return BuildInfo.fromSingleBuildResponse(content) | ||||
return BuildInfo() | return BuildInfo() | ||||
def checkBuildIsAutomated(self, buildInfo): | def checkBuildIsAutomated(self, buildInfo): | ||||
trigger = buildInfo['triggered'] | trigger = buildInfo["triggered"] | ||||
# Ignore builds by non-bot users, as these builds may be triggered for | # Ignore builds by non-bot users, as these builds may be triggered for | ||||
# any reason with various unknown configs | # any reason with various unknown configs | ||||
return trigger['type'] != 'user' or trigger['user']['username'] == self.auth[0] | return trigger["type"] != "user" or trigger["user"]["username"] == self.auth[0] | ||||
def checkBuildIsScheduled(self, buildInfo): | def checkBuildIsScheduled(self, buildInfo): | ||||
trigger = buildInfo['triggered'] | trigger = buildInfo["triggered"] | ||||
return trigger['type'] == 'schedule' | return trigger["type"] == "schedule" | ||||
# For all nested build configurations under a project, fetch the latest | # For all nested build configurations under a project, fetch the latest | ||||
# build failures. | # build failures. | ||||
def getLatestBuildAndTestFailures(self, projectId): | def getLatestBuildAndTestFailures(self, projectId): | ||||
buildEndpoint = self.build_url( | buildEndpoint = self.build_url( | ||||
"app/rest/problemOccurrences", | "app/rest/problemOccurrences", | ||||
{ | { | ||||
"locator": f"currentlyFailing:true,affectedProject:(id:{projectId})", | "locator": f"currentlyFailing:true,affectedProject:(id:{projectId})", | ||||
"fields": "problemOccurrence(*)", | "fields": "problemOccurrence(*)", | ||||
} | }, | ||||
) | ) | ||||
buildReq = self._request('GET', buildEndpoint) | buildReq = self._request("GET", buildEndpoint) | ||||
buildContent = self.getResponse(buildReq) | buildContent = self.getResponse(buildReq) | ||||
buildFailures = [] | buildFailures = [] | ||||
if 'problemOccurrence' in (buildContent or {}): | if "problemOccurrence" in (buildContent or {}): | ||||
buildFailures = buildContent['problemOccurrence'] | buildFailures = buildContent["problemOccurrence"] | ||||
testEndpoint = self.build_url( | testEndpoint = self.build_url( | ||||
"app/rest/testOccurrences", | "app/rest/testOccurrences", | ||||
{ | { | ||||
"locator": f"currentlyFailing:true,affectedProject:(id:{projectId})", | "locator": f"currentlyFailing:true,affectedProject:(id:{projectId})", | ||||
"fields": "testOccurrence(*)", | "fields": "testOccurrence(*)", | ||||
} | }, | ||||
) | ) | ||||
testReq = self._request('GET', testEndpoint) | testReq = self._request("GET", testEndpoint) | ||||
testContent = self.getResponse(testReq) | testContent = self.getResponse(testReq) | ||||
testFailures = [] | testFailures = [] | ||||
if 'testOccurrence' in (testContent or {}): | if "testOccurrence" in (testContent or {}): | ||||
testFailures = testContent['testOccurrence'] | testFailures = testContent["testOccurrence"] | ||||
return (buildFailures, testFailures) | return (buildFailures, testFailures) | ||||
def getLatestCompletedBuild(self, buildType, build_fields=None): | def getLatestCompletedBuild(self, buildType, build_fields=None): | ||||
if not build_fields: | if not build_fields: | ||||
build_fields = ['id'] | build_fields = ["id"] | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"app/rest/builds", | "app/rest/builds", | ||||
{ | { | ||||
"locator": f"buildType:{buildType}", | "locator": f"buildType:{buildType}", | ||||
"fields": f"build({','.join(build_fields)})", | "fields": f"build({','.join(build_fields)})", | ||||
"count": 1, | "count": 1, | ||||
} | }, | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req) | content = self.getResponse(req) | ||||
builds = content.get('build', []) | builds = content.get("build", []) | ||||
# There might be no build completed yet, in this case return None | # There might be no build completed yet, in this case return None | ||||
if not builds: | if not builds: | ||||
return None | return None | ||||
# But there should be no more than a single build | # But there should be no more than a single build | ||||
if len(builds) > 1: | if len(builds) > 1: | ||||
raise AssertionError( | raise AssertionError( | ||||
f"Unexpected Teamcity result. Called:\n{endpoint}\nGot:\n{content}" | f"Unexpected Teamcity result. Called:\n{endpoint}\nGot:\n{content}" | ||||
) | ) | ||||
return builds[0] | return builds[0] | ||||
def formatTime(self, seconds): | def formatTime(self, seconds): | ||||
return time.strftime('%Y%m%dT%H%M%S%z', time.gmtime(seconds)) | return time.strftime("%Y%m%dT%H%M%S%z", time.gmtime(seconds)) | ||||
# The returned count is the number of groups of back-to-back failures, not | # The returned count is the number of groups of back-to-back failures, not | ||||
# the number of individual failures | # the number of individual failures | ||||
def getNumAggregateFailuresSince(self, buildType, since): | def getNumAggregateFailuresSince(self, buildType, since): | ||||
sinceTime = self.getTime() - since | sinceTime = self.getTime() - since | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"app/rest/builds", | "app/rest/builds", | ||||
{ | { | ||||
"locator": f"buildType:{buildType},sinceDate:{self.formatTime(sinceTime)}", | "locator": ( | ||||
f"buildType:{buildType},sinceDate:{self.formatTime(sinceTime)}" | |||||
), | |||||
"fields": "build", | "fields": "build", | ||||
} | }, | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req) | content = self.getResponse(req) | ||||
if 'build' in (content or {}): | if "build" in (content or {}): | ||||
builds = [{'status': 'SUCCESS'}] + content['build'] | builds = [{"status": "SUCCESS"}] + content["build"] | ||||
return sum([(builds[i - 1]['status'], builds[i]['status']) | return sum( | ||||
== ('SUCCESS', 'FAILURE') for i in range(1, len(builds))]) | [ | ||||
(builds[i - 1]["status"], builds[i]["status"]) | |||||
== ("SUCCESS", "FAILURE") | |||||
for i in range(1, len(builds)) | |||||
] | |||||
) | |||||
return 0 | return 0 | ||||
# For each of the given build name from the configuration file, associate the | # For each of the given build name from the configuration file, associate the | ||||
# teamcity build type id and teamcity build name | # teamcity build type id and teamcity build name | ||||
def associate_configuration_names(self, project_id, config_names): | def associate_configuration_names(self, project_id, config_names): | ||||
# Get all the build configurations related to the given project, and | # Get all the build configurations related to the given project, and | ||||
# heavily filter the output to only return the id, name, project info | # heavily filter the output to only return the id, name, project info | ||||
# and the property name matching the configuration file. | # and the property name matching the configuration file. | ||||
endpoint = self.build_url( | endpoint = self.build_url( | ||||
"app/rest/buildTypes", | "app/rest/buildTypes", | ||||
{ | { | ||||
"locator": f"affectedProject:{project_id}", | "locator": f"affectedProject:{project_id}", | ||||
"fields": "buildType(project(id,name),id,name,parameters($locator(name:env.ABC_BUILD_NAME),property))", | "fields": "buildType(project(id,name),id,name,parameters($locator(name:env.ABC_BUILD_NAME),property))", | ||||
} | }, | ||||
) | ) | ||||
req = self._request('GET', endpoint) | req = self._request("GET", endpoint) | ||||
content = self.getResponse(req) | content = self.getResponse(req) | ||||
# Example of output: | # Example of output: | ||||
# "buildType": [ | # "buildType": [ | ||||
# { | # { | ||||
# "id": "BitcoinABC_Master_Build1", | # "id": "BitcoinABC_Master_Build1", | ||||
# "name": "My build 1", | # "name": "My build 1", | ||||
# "project": { | # "project": { | ||||
Show All 23 Lines | def associate_configuration_names(self, project_id, config_names): | ||||
# "value": "build-2" | # "value": "build-2" | ||||
# } | # } | ||||
# ] | # ] | ||||
# } | # } | ||||
# } | # } | ||||
# ] | # ] | ||||
associated_config = {} | associated_config = {} | ||||
for build_type in content.get('buildType', {}): | for build_type in content.get("buildType", {}): | ||||
if 'parameters' not in build_type: | if "parameters" not in build_type: | ||||
continue | continue | ||||
properties = build_type['parameters'].get('property', []) | properties = build_type["parameters"].get("property", []) | ||||
for build_property in properties: | for build_property in properties: | ||||
# Because of our filter, the only possible property is the one we | # Because of our filter, the only possible property is the one we | ||||
# are after. Looking at the value is enough. | # are after. Looking at the value is enough. | ||||
config_name = build_property.get('value', None) | config_name = build_property.get("value", None) | ||||
if config_name in config_names: | if config_name in config_names: | ||||
associated_config.update({ | associated_config.update( | ||||
{ | |||||
config_name: { | config_name: { | ||||
"teamcity_build_type_id": build_type['id'], | "teamcity_build_type_id": build_type["id"], | ||||
"teamcity_build_name": build_type['name'], | "teamcity_build_name": build_type["name"], | ||||
"teamcity_project_id": build_type['project']['id'], | "teamcity_project_id": build_type["project"]["id"], | ||||
"teamcity_project_name": build_type['project']['name'], | "teamcity_project_name": build_type["project"]["name"], | ||||
} | } | ||||
}) | } | ||||
) | |||||
return associated_config | return associated_config | ||||
def build_url(self, path="", params=None, fragment=None): | def build_url(self, path="", params=None, fragment=None): | ||||
if params is None: | if params is None: | ||||
params = {} | params = {} | ||||
# Make guest access the default when not calling the rest API. | # Make guest access the default when not calling the rest API. | ||||
# The caller can explicitly set guest=0 to bypass this behavior. | # The caller can explicitly set guest=0 to bypass this behavior. | ||||
if "guest" not in params and not path.startswith("app/rest/"): | if "guest" not in params and not path.startswith("app/rest/"): | ||||
params["guest"] = 1 | params["guest"] = 1 | ||||
scheme, netloc = urlsplit(self.base_url)[0:2] | scheme, netloc = urlsplit(self.base_url)[0:2] | ||||
return urlunsplit(( | return urlunsplit( | ||||
scheme, | (scheme, netloc, path, urlencode(params, doseq=True), fragment) | ||||
netloc, | ) | ||||
path, | |||||
urlencode(params, doseq=True), | |||||
fragment | |||||
)) | |||||
def convert_to_guest_url(self, url): | def convert_to_guest_url(self, url): | ||||
parsed_url = urlsplit(url) | parsed_url = urlsplit(url) | ||||
# Don't touch unrelated URLs. | # Don't touch unrelated URLs. | ||||
parsed_base_url = urlsplit(self.base_url) | parsed_base_url = urlsplit(self.base_url) | ||||
if parsed_base_url.scheme != parsed_url.scheme or parsed_base_url.netloc != parsed_url.netloc: | if ( | ||||
parsed_base_url.scheme != parsed_url.scheme | |||||
or parsed_base_url.netloc != parsed_url.netloc | |||||
): | |||||
return url | return url | ||||
return self.build_url( | return self.build_url( | ||||
parsed_url.path, | parsed_url.path, parse_qs(parsed_url.query), parsed_url.fragment | ||||
parse_qs(parsed_url.query), | |||||
parsed_url.fragment | |||||
) | ) | ||||
def _request(self, verb, url, data=None, headers=None): | def _request(self, verb, url, data=None, headers=None): | ||||
if self.logger: | if self.logger: | ||||
self.logger.info(f'{verb}: {url}') | self.logger.info(f"{verb}: {url}") | ||||
if headers is None: | if headers is None: | ||||
headers = { | headers = {"Accept": "application/json", "Content-Type": "application/json"} | ||||
'Accept': 'application/json', | req = requests.Request(verb, url, auth=self.auth, headers=headers) | ||||
'Content-Type': 'application/json' | |||||
} | |||||
req = requests.Request( | |||||
verb, | |||||
url, | |||||
auth=self.auth, | |||||
headers=headers) | |||||
req.data = data | req.data = data | ||||
return req | return req |