Add ecr tag mutability & image scan config (#4139)

* Add ecr.put_image_tag_mutability

* Add ecr.put_image_scanning_configuration
This commit is contained in:
Anton Grübel 2021-08-05 22:23:11 +09:00 committed by GitHub
parent 79f0cc9e9e
commit cadbee35cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 266 additions and 44 deletions

View File

@ -6,8 +6,8 @@ class RepositoryAlreadyExistsException(JsonRESTError):
code = 400
def __init__(self, repository_name, registry_id):
super(RepositoryAlreadyExistsException, self).__init__(
error_type="RepositoryAlreadyExistsException",
super().__init__(
error_type=__class__.__name__,
message=(
f"The repository with name '{repository_name}' already exists "
f"in the registry with id '{registry_id}'"
@ -19,8 +19,8 @@ class RepositoryNotEmptyException(JsonRESTError):
code = 400
def __init__(self, repository_name, registry_id):
super(RepositoryNotEmptyException, self).__init__(
error_type="RepositoryNotEmptyException",
super().__init__(
error_type=__class__.__name__,
message=(
f"The repository with name '{repository_name}' "
f"in registry with id '{registry_id}' "
@ -33,8 +33,8 @@ class RepositoryNotFoundException(JsonRESTError):
code = 400
def __init__(self, repository_name, registry_id):
super(RepositoryNotFoundException, self).__init__(
error_type="RepositoryNotFoundException",
super().__init__(
error_type=__class__.__name__,
message=(
f"The repository with name '{repository_name}' does not exist "
f"in the registry with id '{registry_id}'"
@ -46,10 +46,18 @@ class ImageNotFoundException(JsonRESTError):
code = 400
def __init__(self, image_id, repository_name, registry_id):
super(ImageNotFoundException, self).__init__(
error_type="ImageNotFoundException",
message="The image with imageId {0} does not exist within the repository with name '{1}' "
"in the registry with id '{2}'".format(
image_id, repository_name, registry_id
super().__init__(
error_type=__class__.__name__,
message=(
f"The image with imageId {image_id} does not exist "
f"within the repository with name '{repository_name}' "
f"in the registry with id '{registry_id}'"
),
)
class InvalidParameterException(JsonRESTError):
code = 400
def __init__(self, message):
super().__init__(error_type=__class__.__name__, message=message)

View File

@ -3,6 +3,7 @@ from __future__ import unicode_literals
import hashlib
import re
import uuid
from collections import namedtuple
from datetime import datetime
from random import random
@ -16,10 +17,16 @@ from moto.ecr.exceptions import (
RepositoryNotFoundException,
RepositoryAlreadyExistsException,
RepositoryNotEmptyException,
InvalidParameterException,
)
from moto.utilities.tagging_service import TaggingService
DEFAULT_REGISTRY_ID = ACCOUNT_ID
ECR_REPOSITORY_ARN_PATTERN = "^arn:(?P<partition>[^:]+):ecr:(?P<region>[^:]+):(?P<account_id>[^:]+):repository/(?P<repo_name>.*)$"
EcrRepositoryArn = namedtuple(
"EcrRepositoryArn", ["partition", "region", "account_id", "repo_name"]
)
class BaseObject(BaseModel):
@ -99,9 +106,9 @@ class Repository(BaseObject, CloudFormationModel):
del response_object["arn"], response_object["name"], response_object["images"]
return response_object
def update(self, image_scan_config, image_tag_mutability):
def update(self, image_scan_config=None, image_tag_mutability=None):
if image_scan_config:
self.image_scan_config = image_scan_config
self.image_scanning_configuration = image_scan_config
if image_tag_mutability:
self.image_tag_mutability = image_tag_mutability
@ -287,6 +294,24 @@ class ECRBackend(BaseBackend):
self.__dict__ = {}
self.__init__(region_name)
def _get_repository(self, name, registry_id=None):
repo = self.repositories.get(name)
reg_id = registry_id or DEFAULT_REGISTRY_ID
if not repo or repo.registry_id != reg_id:
raise RepositoryNotFoundException(name, reg_id)
return repo
@staticmethod
def _parse_resource_arn(resource_arn) -> EcrRepositoryArn:
match = re.match(ECR_REPOSITORY_ARN_PATTERN, resource_arn)
if not match:
raise InvalidParameterException(
"Invalid parameter at 'resourceArn' failed to satisfy constraint: "
"'Invalid ARN'"
)
return EcrRepositoryArn(**match.groupdict())
def describe_repositories(self, registry_id=None, repository_names=None):
"""
maxResults and nextToken not implemented
@ -336,20 +361,16 @@ class ECRBackend(BaseBackend):
return repository
def delete_repository(self, repository_name, registry_id=None):
repo = self.repositories.get(repository_name)
if repo:
if repo.images:
raise RepositoryNotEmptyException(
repository_name, registry_id or DEFAULT_REGISTRY_ID
)
repo = self._get_repository(repository_name, registry_id)
self.tagger.delete_all_tags_for_resource(repo.arn)
return self.repositories.pop(repository_name)
else:
raise RepositoryNotFoundException(
if repo.images:
raise RepositoryNotEmptyException(
repository_name, registry_id or DEFAULT_REGISTRY_ID
)
self.tagger.delete_all_tags_for_resource(repo.arn)
return self.repositories.pop(repository_name)
def list_images(self, repository_name, registry_id=None):
"""
maxResults and filtering not implemented
@ -588,33 +609,54 @@ class ECRBackend(BaseBackend):
return response
def list_tags_for_resource(self, arn):
name = arn.split("/")[-1]
resource = self._parse_resource_arn(arn)
repo = self._get_repository(resource.repo_name, resource.account_id)
repo = self.repositories.get(name)
if repo:
return self.tagger.list_tags_for_resource(repo.arn)
else:
raise RepositoryNotFoundException(name, DEFAULT_REGISTRY_ID)
return self.tagger.list_tags_for_resource(repo.arn)
def tag_resource(self, arn, tags):
name = arn.split("/")[-1]
resource = self._parse_resource_arn(arn)
repo = self._get_repository(resource.repo_name, resource.account_id)
self.tagger.tag_resource(repo.arn, tags)
repo = self.repositories.get(name)
if repo:
self.tagger.tag_resource(repo.arn, tags)
return {}
else:
raise RepositoryNotFoundException(name, DEFAULT_REGISTRY_ID)
return {}
def untag_resource(self, arn, tag_keys):
name = arn.split("/")[-1]
resource = self._parse_resource_arn(arn)
repo = self._get_repository(resource.repo_name, resource.account_id)
self.tagger.untag_resource_using_names(repo.arn, tag_keys)
repo = self.repositories.get(name)
if repo:
self.tagger.untag_resource_using_names(repo.arn, tag_keys)
return {}
else:
raise RepositoryNotFoundException(name, DEFAULT_REGISTRY_ID)
return {}
def put_image_tag_mutability(
self, registry_id, repository_name, image_tag_mutability
):
if image_tag_mutability not in ["IMMUTABLE", "MUTABLE"]:
raise InvalidParameterException(
"Invalid parameter at 'imageTagMutability' failed to satisfy constraint: "
"'Member must satisfy enum value set: [IMMUTABLE, MUTABLE]'"
)
repo = self._get_repository(repository_name, registry_id)
repo.update(image_tag_mutability=image_tag_mutability)
return {
"registryId": repo.registry_id,
"repositoryName": repository_name,
"imageTagMutability": repo.image_tag_mutability,
}
def put_image_scanning_configuration(
self, registry_id, repository_name, image_scan_config
):
repo = self._get_repository(repository_name, registry_id)
repo.update(image_scan_config=image_scan_config)
return {
"registryId": repo.registry_id,
"repositoryName": repository_name,
"imageScanningConfiguration": repo.image_scanning_configuration,
}
ecr_backends = {}

View File

@ -201,3 +201,29 @@ class ECRResponse(BaseResponse):
tag_keys = self._get_param("tagKeys", [])
return json.dumps(self.ecr_backend.untag_resource(arn, tag_keys))
def put_image_tag_mutability(self):
registry_id = self._get_param("registryId")
repository_name = self._get_param("repositoryName")
image_tag_mutability = self._get_param("imageTagMutability")
return json.dumps(
self.ecr_backend.put_image_tag_mutability(
registry_id=registry_id,
repository_name=repository_name,
image_tag_mutability=image_tag_mutability,
)
)
def put_image_scanning_configuration(self):
registry_id = self._get_param("registryId")
repository_name = self._get_param("repositoryName")
image_scan_config = self._get_param("imageScanningConfiguration")
return json.dumps(
self.ecr_backend.put_image_scanning_configuration(
registry_id=registry_id,
repository_name=repository_name,
image_scan_config=image_scan_config,
)
)

View File

@ -2,7 +2,6 @@ TestAccAWSEc2TransitGatewayDxGatewayAttachmentDataSource
TestAccAWSEc2TransitGatewayPeeringAttachmentAccepter
TestAccAWSEc2TransitGatewayRouteTableAssociation
TestAccAWSEc2TransitGatewayVpcAttachment
TestAccAWSEcrRepository
TestAccAWSEcrRepositoryPolicy
TestAccAWSFms
TestAccAWSIAMRolePolicy

View File

@ -42,6 +42,7 @@ TestAccAWSEc2TransitGatewayVpcAttachmentDataSource
TestAccAWSEc2TransitGatewayVpnAttachmentDataSource
TestAccAWSEc2TransitGatewayPeeringAttachment
TestAccAWSEc2TransitGatewayPeeringAttachmentDataSource
TestAccAWSEcrRepository
TestAccAWSEcrRepositoryDataSource
TestAccAWSElasticBeanstalkSolutionStackDataSource
TestAccAWSElbHostedZoneId

View File

@ -1228,6 +1228,27 @@ def test_list_tags_for_resource_error_not_exists():
)
@mock_ecr
def test_list_tags_for_resource_error_invalid_param():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
# when
with pytest.raises(ClientError) as e:
client.list_tags_for_resource(resourceArn="invalid",)
# then
ex = e.value
ex.operation_name.should.equal("ListTagsForResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Invalid parameter at 'resourceArn' failed to satisfy constraint: "
"'Invalid ARN'"
)
@mock_ecr
def test_tag_resource():
# given
@ -1322,3 +1343,128 @@ def test_untag_resource_error_not_exists():
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_put_image_tag_mutability():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
response = client.describe_repositories(repositoryNames=[repo_name])
response["repositories"][0]["imageTagMutability"].should.equal("MUTABLE")
# when
response = client.put_image_tag_mutability(
repositoryName=repo_name, imageTagMutability="IMMUTABLE",
)
# then
response["imageTagMutability"].should.equal("IMMUTABLE")
response["registryId"].should.equal(ACCOUNT_ID)
response["repositoryName"].should.equal(repo_name)
response = client.describe_repositories(repositoryNames=[repo_name])
response["repositories"][0]["imageTagMutability"].should.equal("IMMUTABLE")
@mock_ecr
def test_put_image_tag_mutability_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
# when
with pytest.raises(ClientError) as e:
client.put_image_tag_mutability(
repositoryName=repo_name, imageTagMutability="IMMUTABLE",
)
# then
ex = e.value
ex.operation_name.should.equal("PutImageTagMutability")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_put_image_tag_mutability_error_invalid_param():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
# when
with pytest.raises(ClientError) as e:
client.put_image_tag_mutability(
repositoryName=repo_name, imageTagMutability="invalid",
)
# then
ex = e.value
ex.operation_name.should.equal("PutImageTagMutability")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Invalid parameter at 'imageTagMutability' failed to satisfy constraint: "
"'Member must satisfy enum value set: [IMMUTABLE, MUTABLE]'"
)
@mock_ecr
def test_put_image_scanning_configuration():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
response = client.describe_repositories(repositoryNames=[repo_name])
response["repositories"][0]["imageScanningConfiguration"].should.equal(
{"scanOnPush": False}
)
# when
response = client.put_image_scanning_configuration(
repositoryName=repo_name, imageScanningConfiguration={"scanOnPush": True}
)
# then
response["imageScanningConfiguration"].should.equal({"scanOnPush": True})
response["registryId"].should.equal(ACCOUNT_ID)
response["repositoryName"].should.equal(repo_name)
response = client.describe_repositories(repositoryNames=[repo_name])
response["repositories"][0]["imageScanningConfiguration"].should.equal(
{"scanOnPush": True}
)
@mock_ecr
def test_put_image_scanning_configuration_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
# when
with pytest.raises(ClientError) as e:
client.put_image_scanning_configuration(
repositoryName=repo_name, imageScanningConfiguration={"scanOnPush": True},
)
# then
ex = e.value
ex.operation_name.should.equal("PutImageScanningConfiguration")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)