diff --git a/contrib/buildbot/teamcity_wrapper.py b/contrib/buildbot/teamcity_wrapper.py index 872a1162c..ce05ed580 100755 --- a/contrib/buildbot/teamcity_wrapper.py +++ b/contrib/buildbot/teamcity_wrapper.py @@ -1,497 +1,497 @@ #!/usr/bin/env python3 from collections import UserDict import io import json import os from pprint import pprint import re import requests import time from urllib.parse import ( parse_qs, urlencode, urlsplit, urlunsplit, ) from zipfile import ZipFile class TeamcityRequestException(Exception): pass class BuildInfo(UserDict): @staticmethod def fromSingleBuildResponse(json_content): return BuildInfo(json_content['build'][0]) def getCommits(self): return [change['version'] for change in self.data['changes'] ['change']] if 'changes' in (self.data or {}) else None def getProperties(self): propsList = [] if 'properties' in (self.data or {}): propsList = self.data['properties']['property'] # Transform list of properties [{'name': name, 'value': value}, ...] into a # dict {name: value, ...} since we only care about the property values. properties = {} for prop in propsList: properties[prop['name']] = prop['value'] return properties if properties else None class TeamCity(): def __init__(self, base_url, username, password): self.session = requests.Session() self.base_url = base_url self.auth = (username, password) self.logger = None self.mockTime = None with open(os.path.join(os.path.dirname(__file__), 'ignore-logs.txt'), 'rb') as ignoreList: self.ignoreList = ignoreList.readlines() def set_logger(self, logger): self.logger = logger def getTime(self): if self.mockTime: return self.mockTime # time.time() returns a float, so we cast to an int to make it play nice with our other APIs. # We do not care about sub-second precision anyway. return int(time.time()) - def getIgnoreList(self, ignoreList): + def getIgnoreList(self): return self.ignoreList def setMockTime(self, mockTime): self.mockTime = mockTime def getResponse(self, request, expectJson=True): response = self.session.send(request.prepare()) if response.status_code != requests.codes.ok: # Log the entire response, because something went wrong if self.logger: self.logger.info( "Request:\n{}\n\nResponse:\n{}".format( pprint( vars(request)), pprint( vars(response)))) raise TeamcityRequestException( "Unexpected Teamcity API error! Status code: {}".format( response.status_code)) content = response.content if expectJson: content = json.loads(content) # Log the response content to aid in debugging if self.logger: self.logger.info(content) return content def trigger_build(self, buildTypeId, ref, PHID=None, properties=None): endpoint = self.build_url("app/rest/buildQueue") if not properties: properties = [] if PHID is not None: properties.append({ 'name': 'env.harborMasterTargetPHID', 'value': PHID, }) build = { 'branchName': ref, 'buildType': { 'id': buildTypeId }, 'properties': { 'property': properties, } } req = self._request('POST', endpoint, json.dumps(build)) return self.getResponse(req) def get_artifact(self, buildId, path): endpoint = self.build_url( "app/rest/builds/id:{}/artifacts/content/{}".format(buildId, path) ) req = self._request('GET', endpoint) content = self.getResponse(req, expectJson=False) if not content: return None return content.decode('utf-8') def get_coverage_summary(self, buildId): return self.get_artifact( buildId, "coverage.tar.gz!/coverage-summary.txt") def get_clean_build_log(self, buildId): return self.get_artifact(buildId, "artifacts.tar.gz!/build.clean.log") def getBuildLog(self, buildId): # Try to get the clean build log first, then fallback to the full log try: clean_log = self.get_clean_build_log(buildId) if clean_log: return clean_log except TeamcityRequestException: # This is likely a 404 and the log doesn't exist. Either way, # ignore the failure since there is an alternative log we can # fetch. pass endpoint = self.build_url( "downloadBuildLog.html", { "buildId": buildId, "archived": "true", } ) req = self._request('GET', endpoint) content = self.getResponse(req, expectJson=False) ret = "" if not content: ret = "[Error Fetching Build Log]" else: z = ZipFile(io.BytesIO(content)) for filename in z.namelist(): for line in z.open(filename).readlines(): ret += line.decode('utf-8') return ret.replace('\r\n', '\n') def getBuildProblems(self, buildId): endpoint = self.build_url( "app/rest/problemOccurrences", { "locator": "build:(id:{})".format(buildId), "fields": "problemOccurrence(id,details)", } ) req = self._request('GET', endpoint) content = self.getResponse(req) if 'problemOccurrence' in (content or {}): buildFailures = content['problemOccurrence'] for failure in buildFailures: # Note: Unlike test failures, build "problems" do not have # a well-defined focus line in the build log. For now, we # link to the footer to automatically scroll to the bottom # of the log where failures tend to be. failure['logUrl'] = self.build_url( "viewLog.html", { "tab": "buildLog", "logTab": "tree", "filter": "debug", "expand": "all", "buildId": buildId, }, "footer" ) return buildFailures return [] def getFailedTests(self, buildId): endpoint = self.build_url( "app/rest/testOccurrences", { "locator": "build:(id:{}),status:FAILURE".format(buildId), "fields": "testOccurrence(id,details,name)", } ) req = self._request('GET', endpoint) content = self.getResponse(req) if 'testOccurrence' in (content or {}): testFailures = content['testOccurrence'] for failure in testFailures: params = { "tab": "buildLog", "logTab": "tree", "filter": "debug", "expand": "all", "buildId": buildId, } match = re.search(r'id:(\d+)', failure['id']) if match: params['_focus'] = match.group(1) failure['logUrl'] = self.build_url( "viewLog.html", params ) return testFailures return [] def getBuildChangeDetails(self, changeId): endpoint = self.build_url("app/rest/changes/{}".format(changeId)) req = self._request('GET', endpoint) return self.getResponse(req) or {} def getBuildChanges(self, buildId): endpoint = self.build_url( "app/rest/changes", { "locator": "build:(id:{})".format(buildId), "fields": "change(id)" } ) req = self._request('GET', endpoint) content = self.getResponse(req) if 'change' in (content or {}): changes = content['change'] for i, change in enumerate(changes): changes[i] = self.getBuildChangeDetails(change['id']) return changes return [] def getBuildInfo(self, buildId): endpoint = self.build_url( "app/rest/builds", { "locator": "id:{}".format(buildId), # Note: Wildcard does not match recursively, so if you need data # from a sub-field, be sure to include it in the list. "fields": "build(*,changes(*),properties(*),triggered(*))", } ) req = self._request('GET', endpoint) content = self.getResponse(req) if 'build' in (content or {}): return BuildInfo.fromSingleBuildResponse(content) return BuildInfo() def checkBuildIsAutomated(self, buildInfo): trigger = buildInfo['triggered'] # Ignore builds by non-bot users, as these builds may be triggered for # any reason with various unknown configs return trigger['type'] != 'user' or trigger['user']['username'] == self.auth[0] def checkBuildIsScheduled(self, buildInfo): trigger = buildInfo['triggered'] return trigger['type'] == 'schedule' # For all nested build configurations under a project, fetch the latest # build failures. def getLatestBuildAndTestFailures(self, projectId): buildEndpoint = self.build_url( "app/rest/problemOccurrences", { "locator": "currentlyFailing:true,affectedProject:(id:{})".format(projectId), "fields": "problemOccurrence(*)", } ) buildReq = self._request('GET', buildEndpoint) buildContent = self.getResponse(buildReq) buildFailures = [] if 'problemOccurrence' in (buildContent or {}): buildFailures = buildContent['problemOccurrence'] testEndpoint = self.build_url( "app/rest/testOccurrences", { "locator": "currentlyFailing:true,affectedProject:(id:{})".format(projectId), "fields": "testOccurrence(*)", } ) testReq = self._request('GET', testEndpoint) testContent = self.getResponse(testReq) testFailures = [] if 'testOccurrence' in (testContent or {}): testFailures = testContent['testOccurrence'] return (buildFailures, testFailures) def getLatestCompletedBuild(self, buildType, build_fields=None): if not build_fields: build_fields = ['id'] endpoint = self.build_url( "app/rest/builds", { "locator": "buildType:{}".format(buildType), "fields": "build({})".format(",".join(build_fields)), "count": 1, } ) req = self._request('GET', endpoint) content = self.getResponse(req) builds = content.get('build', []) # There might be no build completed yet, in this case return None if not builds: return None # But there should be no more than a single build if len(builds) > 1: raise AssertionError( "Unexpected Teamcity result. Called:\n{}\nGot:\n{}".format( endpoint, content ) ) return builds[0] def formatTime(self, seconds): return time.strftime('%Y%m%dT%H%M%S%z', time.gmtime(seconds)) # The returned count is the number of groups of back-to-back failures, not # the number of individual failures def getNumAggregateFailuresSince(self, buildType, since): sinceTime = self.getTime() - since endpoint = self.build_url( "app/rest/builds", { "locator": "buildType:{},sinceDate:{}".format(buildType, self.formatTime(sinceTime)), "fields": "build", } ) req = self._request('GET', endpoint) content = self.getResponse(req) if 'build' in (content or {}): builds = [{'status': 'SUCCESS'}] + content['build'] return sum([(builds[i - 1]['status'], builds[i]['status']) == ('SUCCESS', 'FAILURE') for i in range(1, len(builds))]) return 0 # For each of the given build name from the configuration file, associate the # teamcity build type id and teamcity build name def associate_configuration_names(self, project_id, config_names): # Get all the build configurations related to the given project, and # heavily filter the output to only return the id, name, project info # and the property name matching the configuration file. endpoint = self.build_url( "app/rest/buildTypes", { "locator": "affectedProject:{}".format(project_id), "fields": "buildType(project(id,name),id,name,parameters($locator(name:env.ABC_BUILD_NAME),property))", } ) req = self._request('GET', endpoint) content = self.getResponse(req) # Example of output: # "buildType": [ # { # "id": "BitcoinABC_Master_Build1", # "name": "My build 1", # "project": { # "id": "BitcoinABC_Master", # "name": "Master" # }, # "parameters": { # "property": [ # { # "name": "env.ABC_BUILD_NAME", # "value": "build-1" # } # ] # } # }, # { # "id": "BitcoinABC_Master_Build2", # "name": "My build 2", # "project": { # "id": "BitcoinABC_Master", # "name": "Master" # }, # "parameters": { # "property": [ # { # "name": "env.ABC_BUILD_NAME", # "value": "build-2" # } # ] # } # } # ] associated_config = {} for build_type in content.get('buildType', {}): if 'parameters' not in build_type: continue properties = build_type['parameters'].get('property', []) for build_property in properties: # Because of our filter, the only possible property is the one we # are after. Looking at the value is enough. config_name = build_property.get('value', None) if config_name in config_names: associated_config.update({ config_name: { "teamcity_build_type_id": build_type['id'], "teamcity_build_name": build_type['name'], "teamcity_project_id": build_type['project']['id'], "teamcity_project_name": build_type['project']['name'], } }) return associated_config def build_url(self, path="", params=None, fragment=None): if params is None: params = {} # Make guest access the default when not calling the rest API. # The caller can explicitly set guest=0 to bypass this behavior. if "guest" not in params and not path.startswith("app/rest/"): params["guest"] = 1 scheme, netloc = urlsplit(self.base_url)[0:2] return urlunsplit(( scheme, netloc, path, urlencode(params, doseq=True), fragment )) def convert_to_guest_url(self, url): parsed_url = urlsplit(url) # Don't touch unrelated URLs. parsed_base_url = urlsplit(self.base_url) if parsed_base_url.scheme != parsed_url.scheme or parsed_base_url.netloc != parsed_url.netloc: return url return self.build_url( parsed_url.path, parse_qs(parsed_url.query), parsed_url.fragment ) def _request(self, verb, url, data=None, headers=None): if self.logger: self.logger.info('{}: {}'.format(verb, url)) if headers is None: headers = { 'Accept': 'application/json', 'Content-Type': 'application/json' } req = requests.Request( verb, url, auth=self.auth, headers=headers) req.data = data return req diff --git a/contrib/buildbot/test/test_teamcity.py b/contrib/buildbot/test/test_teamcity.py index 6bd2cc17e..0bb7e2a81 100755 --- a/contrib/buildbot/test/test_teamcity.py +++ b/contrib/buildbot/test/test_teamcity.py @@ -1,636 +1,641 @@ #!/usr/bin/env python3 # # Copyright (c) 2019 The Bitcoin ABC developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import json import mock import requests import time import unittest from urllib.parse import urljoin from teamcity_wrapper import TeamcityRequestException from testutil import AnyWith import test.mocks.teamcity class TeamcityTests(unittest.TestCase): def setUp(self): self.teamcity = test.mocks.teamcity.instance() def tearDown(self): pass + def test_ignoreList(self): + expectedList = [b'test'] + self.teamcity.ignoreList = expectedList + assert self.teamcity.getIgnoreList() == expectedList + def test_mockTime(self): currentTime = int(time.time()) - 1 assert self.teamcity.getTime() >= currentTime self.teamcity.setMockTime(1593635000) assert self.teamcity.getTime() == 1593635000 def test_build_url(self): assert self.teamcity.build_url() == urljoin(self.teamcity.base_url, "?guest=1") assert self.teamcity.build_url("foo.html") == urljoin( self.teamcity.base_url, "foo.html?guest=1") assert self.teamcity.build_url( "foo.html", { "foo": "bar", "bar": "baz", }) == urljoin(self.teamcity.base_url, "foo.html?foo=bar&bar=baz&guest=1") assert self.teamcity.build_url( "foo.html", { "foo": "bar", "baz": 42, }) == urljoin(self.teamcity.base_url, "foo.html?foo=bar&baz=42&guest=1") assert self.teamcity.build_url( "foo.html", { "foo": "bar", "baz": 42 }, "anchor") == urljoin(self.teamcity.base_url, "foo.html?foo=bar&baz=42&guest=1#anchor") # No path, a fragment but no query assert self.teamcity.build_url( fragment="anchor") == urljoin(self.teamcity.base_url, "?guest=1#anchor") # Some path, a fragment but no query assert self.teamcity.build_url( "foo.html", fragment="anchor") == urljoin(self.teamcity.base_url, "foo.html?guest=1#anchor") # Use RFC 3986 compliant chars assert self.teamcity.build_url( "foo.html", { "valid": "build($changes(*),properties(?),'triggered([a]:!b&c)')" }) == urljoin(self.teamcity.base_url, "foo.html?valid=build%28%24changes%28%2A%29%2Cproperties%28%3F%29%2C%27triggered%28%5Ba%5D%3A%21b%26c%29%27%29&guest=1") # Check other chars are also quoted/unquoted correctly assert self.teamcity.build_url( "foo.html", { "invalid": "space space,slash/slash,doublequote\"doublequote" }) == urljoin(self.teamcity.base_url, "foo.html?invalid=space+space%2Cslash%2Fslash%2Cdoublequote%22doublequote&guest=1") # The guest is already set to any value assert self.teamcity.build_url( "foo.html", { "foo": "bar", "guest": 0, }) == urljoin(self.teamcity.base_url, "foo.html?foo=bar&guest=0") assert self.teamcity.build_url( "foo.html", { "foo": "bar", "guest": 1, }) == urljoin(self.teamcity.base_url, "foo.html?foo=bar&guest=1") # No guest=1 parameter is appended when calling the rest API assert self.teamcity.build_url( "app/rest/foo", { "foo": "bar", }) == urljoin(self.teamcity.base_url, "app/rest/foo?foo=bar") def test_convert_to_guest_url(self): expect_no_update = [ # Not a valid teamcity URL "", "http://foo.bar", # Already a guest urljoin(self.teamcity.base_url, "?guest=1"), urljoin(self.teamcity.base_url, "?foo=bar&guest=1"), urljoin(self.teamcity.base_url, "?foo=bar&guest=1#anchor"), ] expect_update = [ ( self.teamcity.base_url, urljoin(self.teamcity.base_url, "?guest=1") ), ( urljoin(self.teamcity.base_url, "?"), urljoin(self.teamcity.base_url, "?guest=1") ), ( urljoin(self.teamcity.base_url, "?foo=bar"), urljoin(self.teamcity.base_url, "?foo=bar&guest=1") ), ( urljoin(self.teamcity.base_url, "?foo=bar&bar=baz"), urljoin(self.teamcity.base_url, "?foo=bar&bar=baz&guest=1") ), ( urljoin(self.teamcity.base_url, "#anchor"), urljoin(self.teamcity.base_url, "?guest=1#anchor") ), ( urljoin(self.teamcity.base_url, "?foo=bar#anchor"), urljoin(self.teamcity.base_url, "?foo=bar&guest=1#anchor") ), ( urljoin(self.teamcity.base_url, "?foo=bar&bar=baz#anchor"), urljoin( self.teamcity.base_url, "?foo=bar&bar=baz&guest=1#anchor") ), ] for url in expect_no_update: assert self.teamcity.convert_to_guest_url(url) == url for url_in, url_out in expect_update: assert self.teamcity.convert_to_guest_url(url_in) == url_out def test_requestFailure(self): self.teamcity.session.send.return_value.status_code = requests.codes.bad_request req = self.teamcity._request('GET', 'https://endpoint') self.assertRaises( TeamcityRequestException, self.teamcity.getResponse, req) def test_getBuildProblems_noProblems(self): self.teamcity.session.send.return_value.content = json.dumps({}) output = self.teamcity.getBuildProblems('1234') assert output == [] self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/problemOccurrences", { "locator": "build:(id:1234)", "fields": "problemOccurrence(id,details)", } ) })) def test_getBuildProblems_hasProblems(self): problems = [{ 'id': 'id:2500,build:(id:12345)', 'details': 'test-details', }] self.teamcity.session.send.return_value.content = json.dumps({ 'problemOccurrence': problems, }) output = self.teamcity.getBuildProblems('1234') assert output[0]['id'] == problems[0]['id'] assert output[0]['details'] == problems[0]['details'] assert output[0]['logUrl'] == self.teamcity.build_url( "viewLog.html", { "tab": "buildLog", "logTab": "tree", "filter": "debug", "expand": "all", "buildId": 1234, }, "footer" ) self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/problemOccurrences", { "locator": "build:(id:1234)", "fields": "problemOccurrence(id,details)", } ) })) def test_getFailedTests_noTestFailures(self): self.teamcity.session.send.return_value.content = json.dumps({}) output = self.teamcity.getFailedTests('1234') assert output == [] self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/testOccurrences", { "locator": "build:(id:1234),status:FAILURE", "fields": "testOccurrence(id,details,name)", } ) })) def test_getFailedTests_hasTestFailures(self): failures = [{ 'id': 'id:2500,build:(id:12345)', 'details': 'stacktrace', 'name': 'test name', }] self.teamcity.session.send.return_value.content = json.dumps({ 'testOccurrence': failures, }) output = self.teamcity.getFailedTests('1234') assert output[0]['id'] == failures[0]['id'] assert output[0]['details'] == failures[0]['details'] assert output[0]['name'] == failures[0]['name'] assert output[0]['logUrl'] == self.teamcity.build_url( "viewLog.html", { "tab": "buildLog", "logTab": "tree", "filter": "debug", "expand": "all", "buildId": 1234, "_focus": 2500, } ) self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/testOccurrences", { "locator": "build:(id:1234),status:FAILURE", "fields": "testOccurrence(id,details,name)", } ) })) def test_triggerBuild(self): triggerBuildResponse = test.mocks.teamcity.buildInfo( test.mocks.teamcity.buildInfo_changes(['test-change'])) self.teamcity.session.send.return_value = triggerBuildResponse output = self.teamcity.trigger_build('1234', 'branch-name', 'test-phid', [{ 'name': 'another-property', 'value': 'some value', }]) assert output == json.loads(triggerBuildResponse.content) self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url("app/rest/buildQueue"), 'body': json.dumps({ 'branchName': 'branch-name', 'buildType': { 'id': '1234', }, 'properties': { 'property': [{ 'name': 'another-property', 'value': 'some value', }, { 'name': 'env.harborMasterTargetPHID', 'value': 'test-phid', }], }, }), })) def test_getBuildChangeDetails(self): expectedOutput = { 'username': 'email@bitcoinabc.org', 'user': { 'name': 'Author Name', }, } self.teamcity.session.send.return_value.content = json.dumps( expectedOutput) output = self.teamcity.getBuildChangeDetails('1234') assert output == expectedOutput self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url("app/rest/changes/1234") })) def test_getBuildChanges(self): self.teamcity.session.send.side_effect = [ test.mocks.teamcity.Response(json.dumps({ 'change': [{ 'id': '1234', }], })), test.mocks.teamcity.Response(json.dumps({ 'username': 'email@bitcoinabc.org', 'user': { 'name': 'Author Name', }, })), ] output = self.teamcity.getBuildChanges('2345') assert output[0]['username'] == 'email@bitcoinabc.org' assert output[0]['user']['name'] == 'Author Name' calls = [mock.call(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/changes", { "locator": "build:(id:2345)", "fields": "change(id)", } ) })), mock.call(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url("app/rest/changes/1234") }))] self.teamcity.session.send.assert_has_calls(calls, any_order=False) def test_getBuildInfo(self): self.teamcity.session.send.return_value = test.mocks.teamcity.buildInfo( properties=test.mocks.teamcity.buildInfo_properties([{ 'name': 'env.ABC_BUILD_NAME', 'value': 'build-diff', }]), changes=test.mocks.teamcity.buildInfo_changes( ['101298f9325ddbac7e5a8f405e5e2f24a64e5171']), ) buildInfo = self.teamcity.getBuildInfo('1234') assert buildInfo['triggered']['type'] == 'vcs' assert buildInfo.getProperties().get('env.ABC_BUILD_NAME') == 'build-diff' assert buildInfo.getCommits( )[0] == '101298f9325ddbac7e5a8f405e5e2f24a64e5171' self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/builds", { "locator": "id:1234", "fields": "build(*,changes(*),properties(*),triggered(*))", } ) })) def test_getBuildInfo_noInfo(self): self.teamcity.session.send.return_value = test.mocks.teamcity.Response( json.dumps({})) buildInfo = self.teamcity.getBuildInfo('1234') assert buildInfo.get('triggered', None) is None assert buildInfo.getProperties() is None self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/builds", { "locator": "id:1234", "fields": "build(*,changes(*),properties(*),triggered(*))", } ) })) def test_buildTriggeredByAutomatedUser(self): self.teamcity.session.send.return_value = test.mocks.teamcity.buildInfo_automatedBuild() buildInfo = self.teamcity.getBuildInfo('1234') self.assertTrue(self.teamcity.checkBuildIsAutomated(buildInfo)) self.assertFalse(self.teamcity.checkBuildIsScheduled(buildInfo)) def test_buildTriggeredManually(self): self.teamcity.session.send.return_value = test.mocks.teamcity.buildInfo_userBuild() buildInfo = self.teamcity.getBuildInfo('1234') self.assertFalse(self.teamcity.checkBuildIsAutomated(buildInfo)) self.assertFalse(self.teamcity.checkBuildIsScheduled(buildInfo)) def test_buildTriggeredBySchedule(self): self.teamcity.session.send.return_value = test.mocks.teamcity.buildInfo_scheduledBuild() buildInfo = self.teamcity.getBuildInfo('1234') self.assertTrue(self.teamcity.checkBuildIsAutomated(buildInfo)) self.assertTrue(self.teamcity.checkBuildIsScheduled(buildInfo)) def test_buildTriggeredByVcsCheckin(self): self.teamcity.session.send.return_value = test.mocks.teamcity.buildInfo_vcsCheckinBuild() buildInfo = self.teamcity.getBuildInfo('1234') self.assertTrue(self.teamcity.checkBuildIsAutomated(buildInfo)) self.assertFalse(self.teamcity.checkBuildIsScheduled(buildInfo)) def test_getLatestBuildAndTestFailures(self): self.teamcity.session.send.side_effect = [ test.mocks.teamcity.Response(json.dumps({ 'problemOccurrence': [{ 'id': 'id:2500,build:(id:1000)', 'details': 'build-details', 'build': { 'buildTypeId': 'build1', }, }, { 'id': 'id:2501,build:(id:1001)', 'details': 'build-details', 'build': { 'buildTypeId': 'build2', }, }], })), test.mocks.teamcity.Response(json.dumps({ 'testOccurrence': [{ 'id': 'id:2501,build:(id:1001)', 'details': 'test-details', 'build': { 'buildTypeId': 'build2', }, }, { 'id': 'id:2502,build:(id:1002)', 'details': 'test-details', 'build': { 'buildTypeId': 'build3', }, }], })), ] (buildFailures, testFailures) = self.teamcity.getLatestBuildAndTestFailures( 'BitcoinABC_Master') assert len(buildFailures) == 2 assert len(testFailures) == 2 teamcityCalls = [mock.call(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/problemOccurrences", { "locator": "currentlyFailing:true,affectedProject:(id:BitcoinABC_Master)", "fields": "problemOccurrence(*)", } ) })), mock.call(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/testOccurrences", { "locator": "currentlyFailing:true,affectedProject:(id:BitcoinABC_Master)", "fields": "testOccurrence(*)", } ) }))] self.teamcity.session.send.assert_has_calls( teamcityCalls, any_order=False) def test_getLatestCompletedBuild(self): def call_getLastCompletedBuild(): output = self.teamcity.getLatestCompletedBuild('1234') self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/builds", { "locator": "buildType:1234", "fields": "build(id)", "count": 1, } ) })) return output # No build completed yet self.teamcity.session.send.return_value.content = json.dumps({ 'build': [], }) self.assertEqual(call_getLastCompletedBuild(), None) # A build completed self.teamcity.session.send.return_value.content = json.dumps({ 'build': [{ 'id': 1234, }], }) build = call_getLastCompletedBuild() self.assertEqual(build["id"], 1234) def test_formatTime(self): assert self.teamcity.formatTime(1590000000) == '20200520T184000+0000' def test_getNumAggregateFailuresSince(self): self.teamcity.setMockTime(1590000000) self.teamcity.session.send.return_value.content = json.dumps({ 'build': [], }) assert self.teamcity.getNumAggregateFailuresSince('buildType', 0) == 0 self.teamcity.session.send.return_value.content = json.dumps({ 'build': [ {'status': 'SUCCESS'}, {'status': 'SUCCESS'}, {'status': 'SUCCESS'}, ], }) assert self.teamcity.getNumAggregateFailuresSince('buildType', 0) == 0 self.teamcity.session.send.return_value.content = json.dumps({ 'build': [{'status': 'FAILURE'}], }) assert self.teamcity.getNumAggregateFailuresSince('buildType', 0) == 1 self.teamcity.session.send.return_value.content = json.dumps({ 'build': [ {'status': 'FAILURE'}, {'status': 'FAILURE'}, {'status': 'FAILURE'}, ] }) assert self.teamcity.getNumAggregateFailuresSince('buildType', 0) == 1 self.teamcity.session.send.return_value.content = json.dumps({ 'build': [ {'status': 'FAILURE'}, {'status': 'FAILURE'}, {'status': 'SUCCESS'}, {'status': 'FAILURE'}, ] }) assert self.teamcity.getNumAggregateFailuresSince('buildType', 0) == 2 self.teamcity.session.send.return_value.content = json.dumps({ 'build': [ {'status': 'SUCCESS'}, {'status': 'FAILURE'}, {'status': 'FAILURE'}, {'status': 'SUCCESS'}, {'status': 'FAILURE'}, {'status': 'FAILURE'}, {'status': 'FAILURE'}, {'status': 'SUCCESS'}, {'status': 'FAILURE'}, {'status': 'SUCCESS'}, ] }) assert self.teamcity.getNumAggregateFailuresSince( 'buildType', 10000000) == 3 self.teamcity.session.send.assert_called_with(AnyWith(requests.PreparedRequest, { 'url': self.teamcity.build_url( "app/rest/builds", { "locator": "buildType:{},sinceDate:{}".format('buildType', self.teamcity.formatTime(1580000000)), "fields": "build", } ) })) def test_associate_configuration_names(self): project_id = "Project" def configure_build_types(start=0, stop=10, project=project_id): self.teamcity.session.send.return_value.content = json.dumps({ "buildType": [ { "id": "{}_Build{}".format(project, i), "name": "My build {}".format(i), "project": { "id": "Root_{}".format(project), "name": "My project {}".format(project) }, "parameters": { "property": [ { "name": "env.ABC_BUILD_NAME", "value": "build-{}".format(i) } ] } } for i in range(start, stop) ] }) def call_associate_configuration_names( build_names, project=project_id): config = self.teamcity.associate_configuration_names( project, build_names) self.teamcity.session.send.assert_called() return config build_names = ["build-{}".format(i) for i in range(3)] # No build type configured configure_build_types(0, 0) config = call_associate_configuration_names(build_names) self.assertDictEqual(config, {}) # No matching build configuration configure_build_types(4, 10) config = call_associate_configuration_names(build_names) self.assertDictEqual(config, {}) # Partial match configure_build_types(2, 10) config = call_associate_configuration_names(build_names) self.assertDictEqual(config, { "build-2": { "teamcity_build_type_id": "Project_Build2", "teamcity_build_name": "My build 2", "teamcity_project_id": "Root_Project", "teamcity_project_name": "My project Project", }, } ) # Full match, change project name project_id = "OtherProject" configure_build_types(0, 10, project=project_id) config = call_associate_configuration_names( build_names, project=project_id) self.assertDictEqual(config, { "build-0": { "teamcity_build_type_id": "OtherProject_Build0", "teamcity_build_name": "My build 0", "teamcity_project_id": "Root_OtherProject", "teamcity_project_name": "My project OtherProject", }, "build-1": { "teamcity_build_type_id": "OtherProject_Build1", "teamcity_build_name": "My build 1", "teamcity_project_id": "Root_OtherProject", "teamcity_project_name": "My project OtherProject", }, "build-2": { "teamcity_build_type_id": "OtherProject_Build2", "teamcity_build_name": "My build 2", "teamcity_project_id": "Root_OtherProject", "teamcity_project_name": "My project OtherProject", }, } ) if __name__ == '__main__': unittest.main()