Techdebt: Replace sure with regular assertions in CodeBuild/CodeCommit/CodePipeline (#6495)

This commit is contained in:
Bert Blommers 2023-07-08 10:25:06 +00:00 committed by GitHub
parent a84fbd8c95
commit 426a8ad5ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 286 additions and 304 deletions

View File

@ -611,7 +611,7 @@ def test_custom_timestamp():
],
)
cw.get_metric_statistics(
resp = cw.get_metric_statistics(
Namespace="tester",
MetricName="metric",
StartTime=utc_now - timedelta(seconds=60),
@ -619,7 +619,7 @@ def test_custom_timestamp():
Period=60,
Statistics=["SampleCount", "Sum"],
)
# TODO: What are we actually testing here?
assert resp["Datapoints"] == []
@mock_cloudwatch

View File

@ -1,5 +1,4 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_codebuild
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from botocore.exceptions import ClientError, ParamValidationError
@ -30,33 +29,25 @@ def test_codebuild_create_project_s3_artifacts():
f"arn:aws:iam::{ACCOUNT_ID}:role/service-role/my-codebuild-service-role"
)
response = client.create_project(
project = client.create_project(
name=name,
source=source,
artifacts=artifacts,
environment=environment,
serviceRole=service_role,
)
)["project"]
response.should.have.key("project")
response["project"].should.have.key("serviceRole")
response["project"].should.have.key("name").equals(name)
assert "serviceRole" in project
assert project["name"] == name
response["project"]["environment"].should.equal(
{
"computeType": "BUILD_GENERAL1_SMALL",
"image": "contents_not_validated",
"type": "LINUX_CONTAINER",
}
)
assert project["environment"] == {
"computeType": "BUILD_GENERAL1_SMALL",
"image": "contents_not_validated",
"type": "LINUX_CONTAINER",
}
response["project"]["source"].should.equal(
{"location": "bucketname/path/file.zip", "type": "S3"}
)
response["project"]["artifacts"].should.equal(
{"location": "bucketname", "type": "S3"}
)
assert project["source"] == {"location": "bucketname/path/file.zip", "type": "S3"}
assert project["artifacts"] == {"location": "bucketname", "type": "S3"}
@mock_codebuild
@ -79,31 +70,26 @@ def test_codebuild_create_project_no_artifacts():
f"arn:aws:iam::{ACCOUNT_ID}:role/service-role/my-codebuild-service-role"
)
response = client.create_project(
project = client.create_project(
name=name,
source=source,
artifacts=artifacts,
environment=environment,
serviceRole=service_role,
)
)["project"]
response.should.have.key("project")
response["project"].should.have.key("serviceRole")
response["project"].should.have.key("name").equals(name)
assert "serviceRole" in project
assert project["name"] == name
response["project"]["environment"].should.equal(
{
"computeType": "BUILD_GENERAL1_SMALL",
"image": "contents_not_validated",
"type": "LINUX_CONTAINER",
}
)
assert project["environment"] == {
"computeType": "BUILD_GENERAL1_SMALL",
"image": "contents_not_validated",
"type": "LINUX_CONTAINER",
}
response["project"]["source"].should.equal(
{"location": "bucketname/path/file.zip", "type": "S3"}
)
assert project["source"] == {"location": "bucketname/path/file.zip", "type": "S3"}
response["project"]["artifacts"].should.equal({"type": "NO_ARTIFACTS"})
assert project["artifacts"] == {"type": "NO_ARTIFACTS"}
@mock_codebuild
@ -124,18 +110,18 @@ def test_codebuild_create_project_with_invalid_inputs():
# Name too long
with pytest.raises(client.exceptions.from_code("InvalidInputException")) as err:
client.create_project(name=("some_project_" * 12), **_input)
err.value.response["Error"]["Code"].should.equal("InvalidInputException")
assert err.value.response["Error"]["Code"] == "InvalidInputException"
# Name invalid
with pytest.raises(client.exceptions.from_code("InvalidInputException")) as err:
client.create_project(name="!some_project_", **_input)
err.value.response["Error"]["Code"].should.equal("InvalidInputException")
assert err.value.response["Error"]["Code"] == "InvalidInputException"
# ServiceRole invalid
_input["serviceRole"] = "arn:aws:iam::0000:role/service-role/my-role"
with pytest.raises(client.exceptions.from_code("InvalidInputException")) as err:
client.create_project(name="valid_name", **_input)
err.value.response["Error"]["Code"].should.equal("InvalidInputException")
assert err.value.response["Error"]["Code"] == "InvalidInputException"
@mock_codebuild
@ -173,7 +159,7 @@ def test_codebuild_create_project_when_exists():
environment=environment,
serviceRole=service_role,
)
err.value.response["Error"]["Code"].should.equal("ResourceAlreadyExistsException")
assert err.value.response["Error"]["Code"] == "ResourceAlreadyExistsException"
@mock_codebuild
@ -214,7 +200,7 @@ def test_codebuild_list_projects():
projects = client.list_projects()
projects["projects"].should.equal(["project1", "project2"])
assert projects["projects"] == ["project1", "project2"]
@mock_codebuild
@ -246,7 +232,7 @@ def test_codebuild_list_builds_for_project_no_history():
history = client.list_builds_for_project(projectName=name)
# no build history if it's never started
history["ids"].should.equal([])
assert history["ids"] == []
@mock_codebuild
@ -278,7 +264,7 @@ def test_codebuild_list_builds_for_project_with_history():
client.start_build(projectName=name)
response = client.list_builds_for_project(projectName=name)
response["ids"].should.have.length_of(1)
assert len(response["ids"]) == 1
# project never started
@ -310,11 +296,11 @@ def test_codebuild_get_batch_builds_for_project_no_history():
)
response = client.list_builds_for_project(projectName=name)
response["ids"].should.equal([])
assert response["ids"] == []
with pytest.raises(ParamValidationError) as err:
client.batch_get_builds(ids=response["ids"])
err.typename.should.equal("ParamValidationError")
assert err.typename == "ParamValidationError"
@mock_codebuild
@ -325,7 +311,7 @@ def test_codebuild_start_build_no_project():
with pytest.raises(client.exceptions.from_code("ResourceNotFoundException")) as err:
client.start_build(projectName=name)
err.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
assert err.value.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_codebuild
@ -356,8 +342,7 @@ def test_codebuild_start_build_no_overrides():
)
response = client.start_build(projectName=name)
response.should.have.key("build")
response["build"]["sourceVersion"].should.equal("refs/heads/main")
assert response["build"]["sourceVersion"] == "refs/heads/main"
@mock_codebuild
@ -391,7 +376,7 @@ def test_codebuild_start_build_multiple_times():
client.start_build(projectName=name)
client.start_build(projectName=name)
len(client.list_builds()["ids"]).should.equal(3)
assert len(client.list_builds()["ids"]) == 3
@mock_codebuild
@ -429,8 +414,7 @@ def test_codebuild_start_build_with_overrides():
artifactsOverride=artifacts_override,
)
response.should.have.key("build")
response["build"]["sourceVersion"].should.equal("fix/testing")
assert response["build"]["sourceVersion"] == "fix/testing"
@mock_codebuild
@ -464,11 +448,11 @@ def test_codebuild_batch_get_builds_1_project():
history = client.list_builds_for_project(projectName=name)
response = client.batch_get_builds(ids=history["ids"])
response.should.have.key("builds").length_of(1)
response["builds"][0]["currentPhase"].should.equal("COMPLETED")
response["builds"][0]["buildNumber"].should.be.a(int)
response["builds"][0].should.have.key("phases")
len(response["builds"][0]["phases"]).should.equal(11)
assert len(response["builds"]) == 1
assert response["builds"][0]["currentPhase"] == "COMPLETED"
assert isinstance(response["builds"][0]["buildNumber"], int)
assert "phases" in response["builds"][0]
assert len(response["builds"][0]["phases"]) == 11
@mock_codebuild
@ -508,16 +492,16 @@ def test_codebuild_batch_get_builds_2_projects():
client.start_build(projectName="project-2")
response = client.list_builds()
response["ids"].should.have.length_of(2)
assert len(response["ids"]) == 2
"project-1".should.be.within(response["ids"][0])
"project-2".should.be.within(response["ids"][1])
assert "project-1" in response["ids"][0]
assert "project-2" in response["ids"][1]
metadata = client.batch_get_builds(ids=response["ids"])["builds"]
metadata.should.have.length_of(2)
assert len(metadata) == 2
"project-1".should.be.within(metadata[0]["id"])
"project-2".should.be.within(metadata[1]["id"])
assert "project-1" in metadata[0]["id"]
assert "project-2" in metadata[1]["id"]
@mock_codebuild
@ -526,7 +510,7 @@ def test_codebuild_batch_get_builds_invalid_build_id():
with pytest.raises(client.exceptions.InvalidInputException) as err:
client.batch_get_builds(ids=[f"some_project{uuid1()}"])
err.value.response["Error"]["Code"].should.equal("InvalidInputException")
assert err.value.response["Error"]["Code"] == "InvalidInputException"
@mock_codebuild
@ -535,7 +519,7 @@ def test_codebuild_batch_get_builds_empty_build_id():
with pytest.raises(ParamValidationError) as err:
client.batch_get_builds(ids=[])
err.typename.should.equal("ParamValidationError")
assert err.typename == "ParamValidationError"
@mock_codebuild
@ -567,13 +551,13 @@ def test_codebuild_delete_project():
client.start_build(projectName=name)
response = client.list_builds_for_project(projectName=name)
response["ids"].should.have.length_of(1)
assert len(response["ids"]) == 1
client.delete_project(name=name)
with pytest.raises(ClientError) as err:
client.list_builds_for_project(projectName=name)
err.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
assert err.value.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_codebuild
@ -607,7 +591,7 @@ def test_codebuild_stop_build():
builds = client.list_builds()
response = client.stop_build(id=builds["ids"][0])
response["build"]["buildStatus"].should.equal("STOPPED")
assert response["build"]["buildStatus"] == "STOPPED"
@mock_codebuild
@ -616,7 +600,7 @@ def test_codebuild_stop_build_no_build():
with pytest.raises(client.exceptions.ResourceNotFoundException) as err:
client.stop_build(id=f"some_project:{uuid1()}")
err.value.response["Error"]["Code"].should.equal("ResourceNotFoundException")
assert err.value.response["Error"]["Code"] == "ResourceNotFoundException"
@mock_codebuild
@ -625,4 +609,4 @@ def test_codebuild_stop_build_bad_uid():
with pytest.raises(client.exceptions.InvalidInputException) as err:
client.stop_build(id=f"some_project{uuid1()}")
err.value.response["Error"]["Code"].should.equal("InvalidInputException")
assert err.value.response["Error"]["Code"] == "InvalidInputException"

View File

@ -1,6 +1,5 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_codecommit
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from botocore.exceptions import ClientError
@ -10,57 +9,57 @@ import pytest
@mock_codecommit
def test_create_repository():
client = boto3.client("codecommit", region_name="eu-central-1")
response = client.create_repository(
metadata = client.create_repository(
repositoryName="repository_one", repositoryDescription="description repo one"
)
)["repositoryMetadata"]
response.should_not.be.none
response["repositoryMetadata"].should_not.be.none
response["repositoryMetadata"]["creationDate"].should_not.be.none
response["repositoryMetadata"]["lastModifiedDate"].should_not.be.none
response["repositoryMetadata"]["repositoryId"].should_not.be.empty
response["repositoryMetadata"]["repositoryName"].should.equal("repository_one")
response["repositoryMetadata"]["repositoryDescription"].should.equal(
"description repo one"
assert metadata["creationDate"] is not None
assert metadata["lastModifiedDate"] is not None
assert metadata["repositoryId"] is not None
assert metadata["repositoryName"] == "repository_one"
assert metadata["repositoryDescription"] == "description repo one"
assert (
metadata["cloneUrlSsh"]
== "ssh://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
)
response["repositoryMetadata"]["cloneUrlSsh"].should.equal(
"ssh://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
assert (
metadata["cloneUrlHttp"]
== "https://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
)
response["repositoryMetadata"]["cloneUrlHttp"].should.equal(
"https://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
assert (
metadata["Arn"]
== f"arn:aws:codecommit:eu-central-1:{ACCOUNT_ID}:repository_one"
)
response["repositoryMetadata"]["Arn"].should.equal(
f"arn:aws:codecommit:eu-central-1:{ACCOUNT_ID}:repository_one"
)
response["repositoryMetadata"]["accountId"].should.equal(ACCOUNT_ID)
assert metadata["accountId"] == ACCOUNT_ID
@mock_codecommit
def test_create_repository_without_description():
client = boto3.client("codecommit", region_name="eu-central-1")
response = client.create_repository(repositoryName="repository_two")
metadata = client.create_repository(repositoryName="repository_two")[
"repositoryMetadata"
]
response.should_not.be.none
response.get("repositoryMetadata").should_not.be.none
response.get("repositoryMetadata").get("repositoryName").should.equal(
"repository_two"
assert metadata.get("repositoryName") == "repository_two"
assert metadata.get("repositoryDescription") is None
assert metadata["creationDate"] is not None
assert metadata["lastModifiedDate"] is not None
assert metadata["repositoryId"] is not None
assert (
metadata["cloneUrlSsh"]
== "ssh://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_two"
)
response.get("repositoryMetadata").get("repositoryDescription").should.be.none
response["repositoryMetadata"].should_not.be.none
response["repositoryMetadata"]["creationDate"].should_not.be.none
response["repositoryMetadata"]["lastModifiedDate"].should_not.be.none
response["repositoryMetadata"]["repositoryId"].should_not.be.empty
response["repositoryMetadata"]["cloneUrlSsh"].should.equal(
"ssh://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_two"
assert (
metadata["cloneUrlHttp"]
== "https://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_two"
)
response["repositoryMetadata"]["cloneUrlHttp"].should.equal(
"https://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_two"
assert (
metadata["Arn"]
== f"arn:aws:codecommit:eu-central-1:{ACCOUNT_ID}:repository_two"
)
response["repositoryMetadata"]["Arn"].should.equal(
f"arn:aws:codecommit:eu-central-1:{ACCOUNT_ID}:repository_two"
)
response["repositoryMetadata"]["accountId"].should.equal(ACCOUNT_ID)
assert metadata["accountId"] == ACCOUNT_ID
@mock_codecommit
@ -75,11 +74,12 @@ def test_create_repository_repository_name_exists():
repositoryDescription="description repo two",
)
ex = e.value
ex.operation_name.should.equal("CreateRepository")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNameExistsException")
ex.response["Error"]["Message"].should.equal(
"Repository named repository_two already exists"
assert ex.operation_name == "CreateRepository"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "RepositoryNameExistsException"
assert (
ex.response["Error"]["Message"]
== "Repository named repository_two already exists"
)
@ -90,15 +90,12 @@ def test_create_repository_invalid_repository_name():
with pytest.raises(ClientError) as e:
client.create_repository(repositoryName="in_123_valid_@#$_characters")
ex = e.value
ex.operation_name.should.equal("CreateRepository")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidRepositoryNameException")
ex.response["Error"]["Message"].should.equal(
"The repository name is not valid. Repository names can be any valid "
"combination of letters, numbers, "
"periods, underscores, and dashes between 1 and 100 characters in "
"length. Names are case sensitive. "
"For more information, see Limits in the AWS CodeCommit User Guide. "
assert ex.operation_name == "CreateRepository"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidRepositoryNameException"
assert (
ex.response["Error"]["Message"]
== "The repository name is not valid. Repository names can be any valid combination of letters, numbers, periods, underscores, and dashes between 1 and 100 characters in length. Names are case sensitive. For more information, see Limits in the AWS CodeCommit User Guide. "
)
@ -112,39 +109,38 @@ def test_get_repository():
repositoryName=repository_name, repositoryDescription="description repo one"
)
response = client.get_repository(repositoryName=repository_name)
metadata = client.get_repository(repositoryName=repository_name)[
"repositoryMetadata"
]
response.should_not.be.none
response.get("repositoryMetadata").should_not.be.none
response.get("repositoryMetadata").get("creationDate").should_not.be.none
response.get("repositoryMetadata").get("lastModifiedDate").should_not.be.none
response.get("repositoryMetadata").get("repositoryId").should_not.be.empty
response.get("repositoryMetadata").get("repositoryName").should.equal(
repository_name
assert metadata["creationDate"] is not None
assert metadata["lastModifiedDate"] is not None
assert metadata["repositoryId"] is not None
assert metadata["repositoryName"] == repository_name
assert metadata["repositoryDescription"] == "description repo one"
assert (
metadata["cloneUrlSsh"]
== "ssh://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
)
response.get("repositoryMetadata").get("repositoryDescription").should.equal(
"description repo one"
assert (
metadata["cloneUrlHttp"]
== "https://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
)
response.get("repositoryMetadata").get("cloneUrlSsh").should.equal(
"ssh://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
assert (
metadata["Arn"]
== f"arn:aws:codecommit:eu-central-1:{ACCOUNT_ID}:repository_one"
)
response.get("repositoryMetadata").get("cloneUrlHttp").should.equal(
"https://git-codecommit.eu-central-1.amazonaws.com/v1/repos/repository_one"
)
response.get("repositoryMetadata").get("Arn").should.equal(
f"arn:aws:codecommit:eu-central-1:{ACCOUNT_ID}:repository_one"
)
response.get("repositoryMetadata").get("accountId").should.equal(ACCOUNT_ID)
assert metadata["accountId"] == ACCOUNT_ID
client = boto3.client("codecommit", region_name="us-east-1")
with pytest.raises(ClientError) as e:
client.get_repository(repositoryName=repository_name)
ex = e.value
ex.operation_name.should.equal("GetRepository")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryDoesNotExistException")
ex.response["Error"]["Message"].should.equal(f"{repository_name} does not exist")
assert ex.operation_name == "GetRepository"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "RepositoryDoesNotExistException"
assert ex.response["Error"]["Message"] == f"{repository_name} does not exist"
@mock_codecommit
@ -154,14 +150,11 @@ def test_get_repository_invalid_repository_name():
with pytest.raises(ClientError) as e:
client.get_repository(repositoryName="repository_one-@#@")
ex = e.value
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidRepositoryNameException")
ex.response["Error"]["Message"].should.equal(
"The repository name is not valid. Repository names can be any valid "
"combination of letters, numbers, "
"periods, underscores, and dashes between 1 and 100 characters in "
"length. Names are case sensitive. "
"For more information, see Limits in the AWS CodeCommit User Guide. "
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidRepositoryNameException"
assert (
ex.response["Error"]["Message"]
== "The repository name is not valid. Repository names can be any valid combination of letters, numbers, periods, underscores, and dashes between 1 and 100 characters in length. Names are case sensitive. For more information, see Limits in the AWS CodeCommit User Guide. "
)
@ -175,12 +168,12 @@ def test_delete_repository():
response = client.delete_repository(repositoryName="repository_one")
response.get("repositoryId").should_not.be.none
repository_id_create.should.equal(response.get("repositoryId"))
assert response.get("repositoryId") is not None
assert repository_id_create == response.get("repositoryId")
response = client.delete_repository(repositoryName="unknown_repository")
response.get("repositoryId").should.be.none
assert response.get("repositoryId") is None
@mock_codecommit
@ -190,13 +183,10 @@ def test_delete_repository_invalid_repository_name():
with pytest.raises(ClientError) as e:
client.delete_repository(repositoryName="_rep@ository_one")
ex = e.value
ex.operation_name.should.equal("DeleteRepository")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidRepositoryNameException")
ex.response["Error"]["Message"].should.equal(
"The repository name is not valid. Repository names can be any valid "
"combination of letters, numbers, "
"periods, underscores, and dashes between 1 and 100 characters in "
"length. Names are case sensitive. "
"For more information, see Limits in the AWS CodeCommit User Guide. "
assert ex.operation_name == "DeleteRepository"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidRepositoryNameException"
assert (
ex.response["Error"]["Message"]
== "The repository name is not valid. Repository names can be any valid combination of letters, numbers, periods, underscores, and dashes between 1 and 100 characters in length. Names are case sensitive. For more information, see Limits in the AWS CodeCommit User Guide. "
)

View File

@ -3,7 +3,6 @@ from copy import deepcopy
from datetime import datetime
import boto3
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
import pytest
@ -68,8 +67,8 @@ def test_create_pipeline():
response = create_basic_codepipeline(client, "test-pipeline")
response["pipeline"].should.equal(expected_pipeline_details)
response["tags"].should.equal([{"key": "key", "value": "value"}])
assert response["pipeline"] == expected_pipeline_details
assert response["tags"] == [{"key": "key", "value": "value"}]
@mock_codepipeline
@ -82,11 +81,12 @@ def test_create_pipeline_errors():
with pytest.raises(ClientError) as e:
create_basic_codepipeline(client, "test-pipeline")
ex = e.value
ex.operation_name.should.equal("CreatePipeline")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidStructureException")
ex.response["Error"]["Message"].should.equal(
"A pipeline with the name 'test-pipeline' already exists in account '123456789012'"
assert ex.operation_name == "CreatePipeline"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidStructureException"
assert (
ex.response["Error"]["Message"]
== "A pipeline with the name 'test-pipeline' already exists in account '123456789012'"
)
with pytest.raises(ClientError) as e:
@ -118,11 +118,12 @@ def test_create_pipeline_errors():
}
)
ex = e.value
ex.operation_name.should.equal("CreatePipeline")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidStructureException")
ex.response["Error"]["Message"].should.equal(
"CodePipeline is not authorized to perform AssumeRole on role arn:aws:iam::123456789012:role/not-existing"
assert ex.operation_name == "CreatePipeline"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidStructureException"
assert (
ex.response["Error"]["Message"]
== "CodePipeline is not authorized to perform AssumeRole on role arn:aws:iam::123456789012:role/not-existing"
)
wrong_role_arn = client_iam.create_role(
@ -170,11 +171,12 @@ def test_create_pipeline_errors():
}
)
ex = e.value
ex.operation_name.should.equal("CreatePipeline")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidStructureException")
ex.response["Error"]["Message"].should.equal(
"CodePipeline is not authorized to perform AssumeRole on role arn:aws:iam::123456789012:role/wrong-role"
assert ex.operation_name == "CreatePipeline"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidStructureException"
assert (
ex.response["Error"]["Message"]
== "CodePipeline is not authorized to perform AssumeRole on role arn:aws:iam::123456789012:role/wrong-role"
)
with pytest.raises(ClientError) as e:
@ -206,11 +208,12 @@ def test_create_pipeline_errors():
}
)
ex = e.value
ex.operation_name.should.equal("CreatePipeline")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidStructureException")
ex.response["Error"]["Message"].should.equal(
"Pipeline has only 1 stage(s). There should be a minimum of 2 stages in a pipeline"
assert ex.operation_name == "CreatePipeline"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidStructureException"
assert (
ex.response["Error"]["Message"]
== "Pipeline has only 1 stage(s). There should be a minimum of 2 stages in a pipeline"
)
@ -221,12 +224,13 @@ def test_get_pipeline():
response = client.get_pipeline(name="test-pipeline")
response["pipeline"].should.equal(expected_pipeline_details)
response["metadata"]["pipelineArn"].should.equal(
"arn:aws:codepipeline:us-east-1:123456789012:test-pipeline"
assert response["pipeline"] == expected_pipeline_details
assert (
response["metadata"]["pipelineArn"]
== "arn:aws:codepipeline:us-east-1:123456789012:test-pipeline"
)
response["metadata"]["created"].should.be.a(datetime)
response["metadata"]["updated"].should.be.a(datetime)
assert isinstance(response["metadata"]["created"], datetime)
assert isinstance(response["metadata"]["updated"], datetime)
@mock_codepipeline
@ -236,11 +240,12 @@ def test_get_pipeline_errors():
with pytest.raises(ClientError) as e:
client.get_pipeline(name="not-existing")
ex = e.value
ex.operation_name.should.equal("GetPipeline")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("PipelineNotFoundException")
ex.response["Error"]["Message"].should.equal(
"Account '123456789012' does not have a pipeline with name 'not-existing'"
assert ex.operation_name == "GetPipeline"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "PipelineNotFoundException"
assert (
ex.response["Error"]["Message"]
== "Account '123456789012' does not have a pipeline with name 'not-existing'"
)
@ -299,62 +304,60 @@ def test_update_pipeline():
}
)
response["pipeline"].should.equal(
{
"name": "test-pipeline",
"roleArn": "arn:aws:iam::123456789012:role/test-role",
"artifactStore": {
"type": "S3",
"location": "codepipeline-us-east-1-123456789012",
assert response["pipeline"] == {
"name": "test-pipeline",
"roleArn": "arn:aws:iam::123456789012:role/test-role",
"artifactStore": {
"type": "S3",
"location": "codepipeline-us-east-1-123456789012",
},
"stages": [
{
"name": "Stage-1",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Source",
"owner": "AWS",
"provider": "S3",
"version": "1",
},
"runOrder": 1,
"configuration": {
"S3Bucket": "different-bucket",
"S3ObjectKey": "test-object",
},
"outputArtifacts": [{"name": "artifact"}],
"inputArtifacts": [],
}
],
},
"stages": [
{
"name": "Stage-1",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Source",
"owner": "AWS",
"provider": "S3",
"version": "1",
},
"runOrder": 1,
"configuration": {
"S3Bucket": "different-bucket",
"S3ObjectKey": "test-object",
},
"outputArtifacts": [{"name": "artifact"}],
"inputArtifacts": [],
}
],
},
{
"name": "Stage-2",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Approval",
"owner": "AWS",
"provider": "Manual",
"version": "1",
},
"runOrder": 1,
"configuration": {},
"outputArtifacts": [],
"inputArtifacts": [],
}
],
},
],
"version": 2,
}
)
{
"name": "Stage-2",
"actions": [
{
"name": "Action-1",
"actionTypeId": {
"category": "Approval",
"owner": "AWS",
"provider": "Manual",
"version": "1",
},
"runOrder": 1,
"configuration": {},
"outputArtifacts": [],
"inputArtifacts": [],
}
],
},
],
"version": 2,
}
metadata = client.get_pipeline(name="test-pipeline")["metadata"]
metadata["created"].should.equal(created_time)
metadata["updated"].should.be.greater_than(updated_time)
assert metadata["created"] == created_time
assert metadata["updated"] > updated_time
@mock_codepipeline
@ -408,11 +411,12 @@ def test_update_pipeline_errors():
}
)
ex = e.value
ex.operation_name.should.equal("UpdatePipeline")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
assert ex.operation_name == "UpdatePipeline"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
ex.response["Error"]["Message"]
== "The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
)
@ -426,15 +430,15 @@ def test_list_pipelines():
response = client.list_pipelines()
response["pipelines"].should.have.length_of(2)
response["pipelines"][0]["name"].should.equal(name_1)
response["pipelines"][0]["version"].should.equal(1)
response["pipelines"][0]["created"].should.be.a(datetime)
response["pipelines"][0]["updated"].should.be.a(datetime)
response["pipelines"][1]["name"].should.equal(name_2)
response["pipelines"][1]["version"].should.equal(1)
response["pipelines"][1]["created"].should.be.a(datetime)
response["pipelines"][1]["updated"].should.be.a(datetime)
assert len(response["pipelines"]) == 2
assert response["pipelines"][0]["name"] == name_1
assert response["pipelines"][0]["version"] == 1
assert isinstance(response["pipelines"][0]["created"], datetime)
assert isinstance(response["pipelines"][0]["updated"], datetime)
assert response["pipelines"][1]["name"] == name_2
assert response["pipelines"][1]["version"] == 1
assert isinstance(response["pipelines"][1]["created"], datetime)
assert isinstance(response["pipelines"][1]["updated"], datetime)
@mock_codepipeline
@ -442,11 +446,11 @@ def test_delete_pipeline():
client = boto3.client("codepipeline", region_name="us-east-1")
name = "test-pipeline"
create_basic_codepipeline(client, name)
client.list_pipelines()["pipelines"].should.have.length_of(1)
assert len(client.list_pipelines()["pipelines"]) == 1
client.delete_pipeline(name=name)
client.list_pipelines()["pipelines"].should.have.length_of(0)
assert len(client.list_pipelines()["pipelines"]) == 0
# deleting a not existing pipeline, should raise no exception
client.delete_pipeline(name=name)
@ -461,7 +465,7 @@ def test_list_tags_for_resource():
response = client.list_tags_for_resource(
resourceArn=f"arn:aws:codepipeline:us-east-1:123456789012:{name}"
)
response["tags"].should.equal([{"key": "key", "value": "value"}])
assert response["tags"] == [{"key": "key", "value": "value"}]
@mock_codepipeline
@ -473,11 +477,12 @@ def test_list_tags_for_resource_errors():
resourceArn="arn:aws:codepipeline:us-east-1:123456789012:not-existing"
)
ex = e.value
ex.operation_name.should.equal("ListTagsForResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
assert ex.operation_name == "ListTagsForResource"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
ex.response["Error"]["Message"]
== "The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
)
@ -495,9 +500,10 @@ def test_tag_resource():
response = client.list_tags_for_resource(
resourceArn=f"arn:aws:codepipeline:us-east-1:123456789012:{name}"
)
response["tags"].should.equal(
[{"key": "key", "value": "value"}, {"key": "key-2", "value": "value-2"}]
)
assert response["tags"] == [
{"key": "key", "value": "value"},
{"key": "key-2", "value": "value-2"},
]
@mock_codepipeline
@ -512,11 +518,12 @@ def test_tag_resource_errors():
tags=[{"key": "key-2", "value": "value-2"}],
)
ex = e.value
ex.operation_name.should.equal("TagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
assert ex.operation_name == "TagResource"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
ex.response["Error"]["Message"]
== "The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
)
with pytest.raises(ClientError) as e:
@ -525,13 +532,12 @@ def test_tag_resource_errors():
tags=[{"key": "aws:key", "value": "value"}],
)
ex = e.value
ex.operation_name.should.equal("TagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidTagsException")
ex.response["Error"]["Message"].should.equal(
"Not allowed to modify system tags. "
"System tags start with 'aws:'. "
"msg=[Caller is an end user and not allowed to mutate system tags]"
assert ex.operation_name == "TagResource"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "InvalidTagsException"
assert (
ex.response["Error"]["Message"]
== "Not allowed to modify system tags. System tags start with 'aws:'. msg=[Caller is an end user and not allowed to mutate system tags]"
)
with pytest.raises(ClientError) as e:
@ -540,11 +546,12 @@ def test_tag_resource_errors():
tags=[{"key": f"key-{i}", "value": f"value-{i}"} for i in range(50)],
)
ex = e.value
ex.operation_name.should.equal("TagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("TooManyTagsException")
ex.response["Error"]["Message"].should.equal(
f"Tag limit exceeded for resource [arn:aws:codepipeline:us-east-1:123456789012:{name}]."
assert ex.operation_name == "TagResource"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "TooManyTagsException"
assert (
ex.response["Error"]["Message"]
== f"Tag limit exceeded for resource [arn:aws:codepipeline:us-east-1:123456789012:{name}]."
)
@ -557,7 +564,7 @@ def test_untag_resource():
response = client.list_tags_for_resource(
resourceArn=f"arn:aws:codepipeline:us-east-1:123456789012:{name}"
)
response["tags"].should.equal([{"key": "key", "value": "value"}])
assert response["tags"] == [{"key": "key", "value": "value"}]
client.untag_resource(
resourceArn=f"arn:aws:codepipeline:us-east-1:123456789012:{name}",
@ -567,7 +574,7 @@ def test_untag_resource():
response = client.list_tags_for_resource(
resourceArn=f"arn:aws:codepipeline:us-east-1:123456789012:{name}"
)
response["tags"].should.have.length_of(0)
assert len(response["tags"]) == 0
# removing a not existing tag should raise no exception
client.untag_resource(
@ -586,11 +593,12 @@ def test_untag_resource_errors():
tagKeys=["key"],
)
ex = e.value
ex.operation_name.should.equal("UntagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("ResourceNotFoundException")
ex.response["Error"]["Message"].should.equal(
"The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
assert ex.operation_name == "UntagResource"
assert ex.response["ResponseMetadata"]["HTTPStatusCode"] == 400
assert ex.response["Error"]["Code"] == "ResourceNotFoundException"
assert (
ex.response["Error"]["Message"]
== "The account with id '123456789012' does not include a pipeline with the name 'not-existing'"
)
@ -702,8 +710,8 @@ def test_create_pipeline_with_extended_trust_policy():
extended_pipeline_details = deepcopy(expected_pipeline_details)
extended_pipeline_details["roleArn"] = role_arn
response["pipeline"].should.equal(extended_pipeline_details)
response["tags"].should.equal([{"key": "key", "value": "value"}])
assert response["pipeline"] == extended_pipeline_details
assert response["tags"] == [{"key": "key", "value": "value"}]
@mock_codepipeline
@ -712,5 +720,5 @@ def test_create_pipeline_without_tags():
response = create_basic_codepipeline(client, "test-pipeline", tags=[])
response["pipeline"].should.equal(expected_pipeline_details)
response["tags"].should.equal([])
assert response["pipeline"] == expected_pipeline_details
assert response["tags"] == []