Update ecr (#4128)

* Add ecr.list_tags_for_resource

* Add ecr.tag_resource

* Add ecr.untag_resource

* Add default KMS key policy, if not specified
This commit is contained in:
Anton Grübel 2021-08-04 00:21:15 +09:00 committed by GitHub
parent 0ec99fae8b
commit 788b8e617d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 728 additions and 100 deletions

View File

@ -1,15 +1,44 @@
from __future__ import unicode_literals
from moto.core.exceptions import RESTError, JsonRESTError
from moto.core.exceptions import JsonRESTError
class RepositoryNotFoundException(RESTError):
class RepositoryAlreadyExistsException(JsonRESTError):
code = 400
def __init__(self, repository_name, registry_id):
super(RepositoryAlreadyExistsException, self).__init__(
error_type="RepositoryAlreadyExistsException",
message=(
f"The repository with name '{repository_name}' already exists "
f"in the registry with id '{registry_id}'"
),
)
class RepositoryNotEmptyException(JsonRESTError):
code = 400
def __init__(self, repository_name, registry_id):
super(RepositoryNotEmptyException, self).__init__(
error_type="RepositoryNotEmptyException",
message=(
f"The repository with name '{repository_name}' "
f"in registry with id '{registry_id}' "
"cannot be deleted because it still contains images"
),
)
class RepositoryNotFoundException(JsonRESTError):
code = 400
def __init__(self, repository_name, registry_id):
super(RepositoryNotFoundException, self).__init__(
error_type="RepositoryNotFoundException",
message="The repository with name '{0}' does not exist in the registry "
"with id '{1}'".format(repository_name, registry_id),
message=(
f"The repository with name '{repository_name}' does not exist "
f"in the registry with id '{registry_id}'"
),
)

View File

@ -2,16 +2,24 @@ from __future__ import unicode_literals
import hashlib
import re
import uuid
from datetime import datetime
from random import random
from botocore.exceptions import ParamValidationError
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.core import BaseBackend, BaseModel, CloudFormationModel, ACCOUNT_ID
from moto.core.utils import iso_8601_datetime_without_milliseconds
from moto.ec2 import ec2_backends
from moto.ecr.exceptions import ImageNotFoundException, RepositoryNotFoundException
from moto.ecr.exceptions import (
ImageNotFoundException,
RepositoryNotFoundException,
RepositoryAlreadyExistsException,
RepositoryNotEmptyException,
)
from moto.utilities.tagging_service import TaggingService
DEFAULT_REGISTRY_ID = "012345678910"
DEFAULT_REGISTRY_ID = ACCOUNT_ID
class BaseObject(BaseModel):
@ -39,18 +47,40 @@ class BaseObject(BaseModel):
class Repository(BaseObject, CloudFormationModel):
def __init__(self, repository_name):
def __init__(
self,
region_name,
repository_name,
encryption_config,
image_scan_config,
image_tag_mutablility,
):
self.region_name = region_name
self.registry_id = DEFAULT_REGISTRY_ID
self.arn = "arn:aws:ecr:us-east-1:{0}:repository/{1}".format(
self.registry_id, repository_name
self.arn = (
f"arn:aws:ecr:{region_name}:{self.registry_id}:repository/{repository_name}"
)
self.name = repository_name
# self.created = datetime.utcnow()
self.uri = "{0}.dkr.ecr.us-east-1.amazonaws.com/{1}".format(
self.registry_id, repository_name
self.created_at = datetime.utcnow()
self.uri = (
f"{self.registry_id}.dkr.ecr.{region_name}.amazonaws.com/{repository_name}"
)
self.image_tag_mutability = image_tag_mutablility or "MUTABLE"
self.image_scanning_configuration = image_scan_config or {"scanOnPush": False}
self.encryption_configuration = self._determine_encryption_config(
encryption_config
)
self.images = []
def _determine_encryption_config(self, encryption_config):
if not encryption_config:
return {"encryptionType": "AES256"}
if encryption_config == {"encryptionType": "KMS"}:
encryption_config[
"kmsKey"
] = f"arn:aws:kms:{self.region_name}:{ACCOUNT_ID}:key/{uuid.uuid4()}"
return encryption_config
@property
def physical_resource_id(self):
return self.name
@ -63,10 +93,32 @@ class Repository(BaseObject, CloudFormationModel):
response_object["repositoryArn"] = self.arn
response_object["repositoryName"] = self.name
response_object["repositoryUri"] = self.uri
# response_object['createdAt'] = self.created
response_object["createdAt"] = iso_8601_datetime_without_milliseconds(
self.created_at
)
del response_object["arn"], response_object["name"], response_object["images"]
return response_object
def update(self, image_scan_config, image_tag_mutability):
if image_scan_config:
self.image_scan_config = image_scan_config
if image_tag_mutability:
self.image_tag_mutability = image_tag_mutability
def delete(self, region_name):
ecr_backend = ecr_backends[region_name]
ecr_backend.delete_repository(self.name)
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == "Arn":
return self.arn
elif attribute_name == "RepositoryUri":
return self.uri
raise UnformattedGetAttTemplateException()
@staticmethod
def cloudformation_name_type():
return "RepositoryName"
@ -81,32 +133,52 @@ class Repository(BaseObject, CloudFormationModel):
cls, resource_name, cloudformation_json, region_name
):
ecr_backend = ecr_backends[region_name]
properties = cloudformation_json["Properties"]
encryption_config = properties.get("EncryptionConfiguration")
image_scan_config = properties.get("ImageScanningConfiguration")
image_tag_mutablility = properties.get("ImageTagMutability")
tags = properties.get("Tags", [])
return ecr_backend.create_repository(
# RepositoryName is optional in CloudFormation, thus create a random
# name if necessary
repository_name=resource_name
repository_name=resource_name,
encryption_config=encryption_config,
image_scan_config=image_scan_config,
image_tag_mutablility=image_tag_mutablility,
tags=tags,
)
@classmethod
def update_from_cloudformation_json(
cls, original_resource, new_resource_name, cloudformation_json, region_name
):
ecr_backend = ecr_backends[region_name]
properties = cloudformation_json["Properties"]
encryption_configuration = properties.get(
"EncryptionConfiguration", {"encryptionType": "AES256"}
)
if original_resource.name != properties["RepositoryName"]:
ecr_backend = ecr_backends[region_name]
ecr_backend.delete_cluster(original_resource.arn)
return ecr_backend.create_repository(
# RepositoryName is optional in CloudFormation, thus create a
# random name if necessary
repository_name=properties.get(
"RepositoryName",
"RepositoryName{0}".format(int(random() * 10 ** 6)),
)
if (
new_resource_name == original_resource.name
and encryption_configuration == original_resource.encryption_configuration
):
original_resource.update(
properties.get("ImageScanningConfiguration"),
properties.get("ImageTagMutability"),
)
else:
# no-op when nothing changed between old and new resources
ecr_backend.tagger.tag_resource(
original_resource.arn, properties.get("Tags", [])
)
return original_resource
else:
original_resource.delete(region_name)
return cls.create_from_cloudformation_json(
new_resource_name, cloudformation_json, region_name
)
class Image(BaseObject):
@ -205,8 +277,15 @@ class Image(BaseObject):
class ECRBackend(BaseBackend):
def __init__(self):
def __init__(self, region_name):
self.region_name = region_name
self.repositories = {}
self.tagger = TaggingService(tagName="tags")
def reset(self):
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def describe_repositories(self, registry_id=None, repository_names=None):
"""
@ -233,13 +312,38 @@ class ECRBackend(BaseBackend):
repositories.append(repository.response_object)
return repositories
def create_repository(self, repository_name):
repository = Repository(repository_name)
def create_repository(
self,
repository_name,
encryption_config,
image_scan_config,
image_tag_mutablility,
tags,
):
if self.repositories.get(repository_name):
raise RepositoryAlreadyExistsException(repository_name, DEFAULT_REGISTRY_ID)
repository = Repository(
region_name=self.region_name,
repository_name=repository_name,
encryption_config=encryption_config,
image_scan_config=image_scan_config,
image_tag_mutablility=image_tag_mutablility,
)
self.repositories[repository_name] = repository
self.tagger.tag_resource(repository.arn, tags)
return repository
def delete_repository(self, repository_name, registry_id=None):
if repository_name in self.repositories:
repo = self.repositories.get(repository_name)
if repo:
if repo.images:
raise RepositoryNotEmptyException(
repository_name, registry_id or DEFAULT_REGISTRY_ID
)
self.tagger.delete_all_tags_for_resource(repo.arn)
return self.repositories.pop(repository_name)
else:
raise RepositoryNotFoundException(
@ -483,7 +587,36 @@ class ECRBackend(BaseBackend):
return response
def list_tags_for_resource(self, arn):
name = arn.split("/")[-1]
repo = self.repositories.get(name)
if repo:
return self.tagger.list_tags_for_resource(repo.arn)
else:
raise RepositoryNotFoundException(name, DEFAULT_REGISTRY_ID)
def tag_resource(self, arn, tags):
name = arn.split("/")[-1]
repo = self.repositories.get(name)
if repo:
self.tagger.tag_resource(repo.arn, tags)
return {}
else:
raise RepositoryNotFoundException(name, DEFAULT_REGISTRY_ID)
def untag_resource(self, arn, tag_keys):
name = arn.split("/")[-1]
repo = self.repositories.get(name)
if repo:
self.tagger.untag_resource_using_names(repo.arn, tag_keys)
return {}
else:
raise RepositoryNotFoundException(name, DEFAULT_REGISTRY_ID)
ecr_backends = {}
for region, ec2_backend in ec2_backends.items():
ecr_backends[region] = ECRBackend()
ecr_backends[region] = ECRBackend(region)

View File

@ -20,14 +20,23 @@ class ECRResponse(BaseResponse):
except ValueError:
return {}
def _get_param(self, param):
return self.request_params.get(param, None)
def _get_param(self, param, if_none=None):
return self.request_params.get(param, if_none)
def create_repository(self):
repository_name = self._get_param("repositoryName")
if repository_name is None:
repository_name = "default"
repository = self.ecr_backend.create_repository(repository_name)
encryption_config = self._get_param("encryptionConfiguration")
image_scan_config = self._get_param("imageScanningConfiguration")
image_tag_mutablility = self._get_param("imageTagMutability")
tags = self._get_param("tags", [])
repository = self.ecr_backend.create_repository(
repository_name=repository_name,
encryption_config=encryption_config,
image_scan_config=image_scan_config,
image_tag_mutablility=image_tag_mutablility,
tags=tags,
)
return json.dumps({"repository": repository.response_object})
def describe_repositories(self):
@ -175,3 +184,20 @@ class ECRResponse(BaseResponse):
def upload_layer_part(self):
if self.is_not_dryrun("UploadLayerPart"):
raise NotImplementedError("ECR.upload_layer_part is not yet implemented")
def list_tags_for_resource(self):
arn = self._get_param("resourceArn")
return json.dumps(self.ecr_backend.list_tags_for_resource(arn))
def tag_resource(self):
arn = self._get_param("resourceArn")
tags = self._get_param("tags", [])
return json.dumps(self.ecr_backend.tag_resource(arn, tags))
def untag_resource(self):
arn = self._get_param("resourceArn")
tag_keys = self._get_param("tagKeys", [])
return json.dumps(self.ecr_backend.untag_resource(arn, tag_keys))

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import json
import os
from collections import defaultdict
from datetime import datetime, timedelta
@ -20,7 +21,7 @@ class Key(CloudFormationModel):
):
self.id = generate_key_id()
self.creation_date = unix_time()
self.policy = policy
self.policy = policy or self.generate_default_policy()
self.key_usage = key_usage
self.key_state = "Enabled"
self.description = description
@ -34,6 +35,23 @@ class Key(CloudFormationModel):
self.key_manager = "CUSTOMER"
self.customer_master_key_spec = customer_master_key_spec or "SYMMETRIC_DEFAULT"
def generate_default_policy(self):
return json.dumps(
{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": "kms:*",
"Resource": "*",
}
],
}
)
@property
def physical_resource_id(self):
return self.id

View File

@ -2,5 +2,7 @@ TestAccAWSEc2TransitGatewayDxGatewayAttachmentDataSource
TestAccAWSEc2TransitGatewayPeeringAttachmentAccepter
TestAccAWSEc2TransitGatewayRouteTableAssociation
TestAccAWSEc2TransitGatewayVpcAttachment
TestAccAWSEcrRepository
TestAccAWSEcrRepositoryPolicy
TestAccAWSFms
TestAccAWSIAMRolePolicy

View File

@ -42,6 +42,7 @@ TestAccAWSEc2TransitGatewayVpcAttachmentDataSource
TestAccAWSEc2TransitGatewayVpnAttachmentDataSource
TestAccAWSEc2TransitGatewayPeeringAttachment
TestAccAWSEc2TransitGatewayPeeringAttachmentDataSource
TestAccAWSEcrRepositoryDataSource
TestAccAWSElasticBeanstalkSolutionStackDataSource
TestAccAWSElbHostedZoneId
TestAccAWSElbServiceAccount

View File

@ -3,6 +3,8 @@ from __future__ import unicode_literals
import hashlib
import json
from datetime import datetime
import pytest
from freezegun import freeze_time
import os
from random import random
@ -17,6 +19,8 @@ from dateutil.tz import tzlocal
from moto import mock_ecr
from unittest import SkipTest
from moto.core import ACCOUNT_ID
def _create_image_digest(contents=None):
if not contents:
@ -56,17 +60,104 @@ def _create_image_manifest():
@mock_ecr
def test_create_repository():
# given
client = boto3.client("ecr", region_name="us-east-1")
response = client.create_repository(repositoryName="test_ecr_repository")
response["repository"]["repositoryName"].should.equal("test_ecr_repository")
response["repository"]["repositoryArn"].should.equal(
"arn:aws:ecr:us-east-1:012345678910:repository/test_ecr_repository"
repo_name = "test-repo"
# when
response = client.create_repository(repositoryName=repo_name)
# then
repo = response["repository"]
repo["repositoryName"].should.equal(repo_name)
repo["repositoryArn"].should.equal(
f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/{repo_name}"
)
response["repository"]["registryId"].should.equal("012345678910")
response["repository"]["repositoryUri"].should.equal(
"012345678910.dkr.ecr.us-east-1.amazonaws.com/test_ecr_repository"
repo["registryId"].should.equal(ACCOUNT_ID)
repo["repositoryUri"].should.equal(
f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/{repo_name}"
)
repo["createdAt"].should.be.a(datetime)
repo["imageTagMutability"].should.equal("MUTABLE")
repo["imageScanningConfiguration"].should.equal({"scanOnPush": False})
repo["encryptionConfiguration"].should.equal({"encryptionType": "AES256"})
@mock_ecr
def test_create_repository_with_non_default_config():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "test-repo"
kms_key = f"arn:aws:kms:{region_name}:{ACCOUNT_ID}:key/51d81fab-b138-4bd2-8a09-07fd6d37224d"
# when
response = client.create_repository(
repositoryName=repo_name,
imageTagMutability="IMMUTABLE",
imageScanningConfiguration={"scanOnPush": True},
encryptionConfiguration={"encryptionType": "KMS", "kmsKey": kms_key},
tags=[{"Key": "key-1", "Value": "value-1"}],
)
# then
repo = response["repository"]
repo["repositoryName"].should.equal(repo_name)
repo["repositoryArn"].should.equal(
f"arn:aws:ecr:{region_name}:{ACCOUNT_ID}:repository/{repo_name}"
)
repo["registryId"].should.equal(ACCOUNT_ID)
repo["repositoryUri"].should.equal(
f"{ACCOUNT_ID}.dkr.ecr.{region_name}.amazonaws.com/{repo_name}"
)
repo["createdAt"].should.be.a(datetime)
repo["imageTagMutability"].should.equal("IMMUTABLE")
repo["imageScanningConfiguration"].should.equal({"scanOnPush": True})
repo["encryptionConfiguration"].should.equal(
{"encryptionType": "KMS", "kmsKey": kms_key}
)
@mock_ecr
def test_create_repository_with_aws_managed_kms():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "test-repo"
# when
repo = client.create_repository(
repositoryName=repo_name, encryptionConfiguration={"encryptionType": "KMS"}
)["repository"]
# then
repo["repositoryName"].should.equal(repo_name)
repo["encryptionConfiguration"]["encryptionType"].should.equal("KMS")
repo["encryptionConfiguration"]["kmsKey"].should.match(
r"arn:aws:kms:eu-central-1:[0-9]{12}:key/[a-f0-9]{8}-[a-f0-9]{4}-[1-5][a-f0-9]{3}-[ab89][a-f0-9]{3}-[a-f0-9]{12}$"
)
@mock_ecr
def test_create_repository_error_already_exists():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
# when
with pytest.raises(ClientError) as e:
client.create_repository(repositoryName=repo_name)
# then
ex = e.value
ex.operation_name.should.equal("CreateRepository")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryAlreadyExistsException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' already exists "
f"in the registry with id '{ACCOUNT_ID}'"
)
# response['repository']['createdAt'].should.equal(0)
@mock_ecr
@ -78,26 +169,26 @@ def test_describe_repositories():
len(response["repositories"]).should.equal(2)
repository_arns = [
"arn:aws:ecr:us-east-1:012345678910:repository/test_repository1",
"arn:aws:ecr:us-east-1:012345678910:repository/test_repository0",
f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/test_repository1",
f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/test_repository0",
]
set(
sorted(
[
response["repositories"][0]["repositoryArn"],
response["repositories"][1]["repositoryArn"],
]
).should.equal(set(repository_arns))
).should.equal(sorted(repository_arns))
repository_uris = [
"012345678910.dkr.ecr.us-east-1.amazonaws.com/test_repository1",
"012345678910.dkr.ecr.us-east-1.amazonaws.com/test_repository0",
f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/test_repository1",
f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/test_repository0",
]
set(
sorted(
[
response["repositories"][0]["repositoryUri"],
response["repositories"][1]["repositoryUri"],
]
).should.equal(set(repository_uris))
).should.equal(sorted(repository_uris))
@mock_ecr
@ -105,30 +196,30 @@ def test_describe_repositories_1():
client = boto3.client("ecr", region_name="us-east-1")
_ = client.create_repository(repositoryName="test_repository1")
_ = client.create_repository(repositoryName="test_repository0")
response = client.describe_repositories(registryId="012345678910")
response = client.describe_repositories(registryId=ACCOUNT_ID)
len(response["repositories"]).should.equal(2)
repository_arns = [
"arn:aws:ecr:us-east-1:012345678910:repository/test_repository1",
"arn:aws:ecr:us-east-1:012345678910:repository/test_repository0",
f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/test_repository1",
f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/test_repository0",
]
set(
sorted(
[
response["repositories"][0]["repositoryArn"],
response["repositories"][1]["repositoryArn"],
]
).should.equal(set(repository_arns))
).should.equal(sorted(repository_arns))
repository_uris = [
"012345678910.dkr.ecr.us-east-1.amazonaws.com/test_repository1",
"012345678910.dkr.ecr.us-east-1.amazonaws.com/test_repository0",
f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/test_repository1",
f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/test_repository0",
]
set(
sorted(
[
response["repositories"][0]["repositoryUri"],
response["repositories"][1]["repositoryUri"],
]
).should.equal(set(repository_uris))
).should.equal(sorted(repository_uris))
@mock_ecr
@ -147,45 +238,71 @@ def test_describe_repositories_3():
_ = client.create_repository(repositoryName="test_repository0")
response = client.describe_repositories(repositoryNames=["test_repository1"])
len(response["repositories"]).should.equal(1)
repository_arn = "arn:aws:ecr:us-east-1:012345678910:repository/test_repository1"
repository_arn = f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/test_repository1"
response["repositories"][0]["repositoryArn"].should.equal(repository_arn)
repository_uri = "012345678910.dkr.ecr.us-east-1.amazonaws.com/test_repository1"
repository_uri = f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/test_repository1"
response["repositories"][0]["repositoryUri"].should.equal(repository_uri)
@mock_ecr
def test_describe_repositories_with_image():
# given
client = boto3.client("ecr", region_name="us-east-1")
_ = client.create_repository(repositoryName="test_repository")
_ = client.put_image(
repositoryName="test_repository",
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
client.put_image(
repositoryName=repo_name,
imageManifest=json.dumps(_create_image_manifest()),
imageTag="latest",
)
response = client.describe_repositories(repositoryNames=["test_repository"])
len(response["repositories"]).should.equal(1)
# when
response = client.describe_repositories(repositoryNames=[repo_name])
# then
response["repositories"].should.have.length_of(1)
repo = response["repositories"][0]
repo["registryId"].should.equal(ACCOUNT_ID)
repo["repositoryArn"].should.equal(
f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/{repo_name}"
)
repo["repositoryName"].should.equal(repo_name)
repo["repositoryUri"].should.equal(
f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/{repo_name}"
)
repo["createdAt"].should.be.a(datetime)
repo["imageScanningConfiguration"].should.equal({"scanOnPush": False})
repo["imageTagMutability"].should.equal("MUTABLE")
repo["encryptionConfiguration"].should.equal({"encryptionType": "AES256"})
@mock_ecr
def test_delete_repository():
# given
client = boto3.client("ecr", region_name="us-east-1")
_ = client.create_repository(repositoryName="test_repository")
response = client.delete_repository(repositoryName="test_repository")
response["repository"]["repositoryName"].should.equal("test_repository")
response["repository"]["repositoryArn"].should.equal(
"arn:aws:ecr:us-east-1:012345678910:repository/test_repository"
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
# when
response = client.delete_repository(repositoryName=repo_name)
# then
repo = response["repository"]
repo["repositoryName"].should.equal(repo_name)
repo["repositoryArn"].should.equal(
f"arn:aws:ecr:us-east-1:{ACCOUNT_ID}:repository/{repo_name}"
)
response["repository"]["registryId"].should.equal("012345678910")
response["repository"]["repositoryUri"].should.equal(
"012345678910.dkr.ecr.us-east-1.amazonaws.com/test_repository"
repo["registryId"].should.equal(ACCOUNT_ID)
repo["repositoryUri"].should.equal(
f"{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/{repo_name}"
)
# response['repository']['createdAt'].should.equal(0)
repo["createdAt"].should.be.a(datetime)
repo["imageTagMutability"].should.equal("MUTABLE")
response = client.describe_repositories()
len(response["repositories"]).should.equal(0)
response["repositories"].should.have.length_of(0)
@mock_ecr
@ -202,7 +319,7 @@ def test_put_image():
response["image"]["imageId"]["imageTag"].should.equal("latest")
response["image"]["imageId"]["imageDigest"].should.contain("sha")
response["image"]["repositoryName"].should.equal("test_repository")
response["image"]["registryId"].should.equal("012345678910")
response["image"]["registryId"].should.equal(ACCOUNT_ID)
@mock_ecr
@ -256,7 +373,7 @@ def test_put_image_with_multiple_tags():
response["image"]["imageId"]["imageTag"].should.equal("v1")
response["image"]["imageId"]["imageDigest"].should.contain("sha")
response["image"]["repositoryName"].should.equal("test_repository")
response["image"]["registryId"].should.equal("012345678910")
response["image"]["registryId"].should.equal(ACCOUNT_ID)
response1 = client.put_image(
repositoryName="test_repository",
@ -267,7 +384,7 @@ def test_put_image_with_multiple_tags():
response1["image"]["imageId"]["imageTag"].should.equal("latest")
response1["image"]["imageId"]["imageDigest"].should.contain("sha")
response1["image"]["repositoryName"].should.equal("test_repository")
response1["image"]["registryId"].should.equal("012345678910")
response1["image"]["registryId"].should.equal(ACCOUNT_ID)
response2 = client.describe_images(repositoryName="test_repository")
type(response2["imageDetails"]).should.be(list)
@ -275,7 +392,7 @@ def test_put_image_with_multiple_tags():
response2["imageDetails"][0]["imageDigest"].should.contain("sha")
response2["imageDetails"][0]["registryId"].should.equal("012345678910")
response2["imageDetails"][0]["registryId"].should.equal(ACCOUNT_ID)
response2["imageDetails"][0]["repositoryName"].should.equal("test_repository")
@ -398,10 +515,10 @@ def test_describe_images():
response["imageDetails"][2]["imageDigest"].should.contain("sha")
response["imageDetails"][3]["imageDigest"].should.contain("sha")
response["imageDetails"][0]["registryId"].should.equal("012345678910")
response["imageDetails"][1]["registryId"].should.equal("012345678910")
response["imageDetails"][2]["registryId"].should.equal("012345678910")
response["imageDetails"][3]["registryId"].should.equal("012345678910")
response["imageDetails"][0]["registryId"].should.equal(ACCOUNT_ID)
response["imageDetails"][1]["registryId"].should.equal(ACCOUNT_ID)
response["imageDetails"][2]["registryId"].should.equal(ACCOUNT_ID)
response["imageDetails"][3]["registryId"].should.equal(ACCOUNT_ID)
response["imageDetails"][0]["repositoryName"].should.equal("test_repository")
response["imageDetails"][1]["repositoryName"].should.equal("test_repository")
@ -448,7 +565,7 @@ def test_describe_images_by_tag():
)
len(response["imageDetails"]).should.be(1)
image_detail = response["imageDetails"][0]
image_detail["registryId"].should.equal("012345678910")
image_detail["registryId"].should.equal(ACCOUNT_ID)
image_detail["repositoryName"].should.equal("test_repository")
image_detail["imageTags"].should.equal([put_response["imageId"]["imageTag"]])
image_detail["imageDigest"].should.equal(put_response["imageId"]["imageDigest"])
@ -558,15 +675,48 @@ def test_describe_image_that_doesnt_exist():
@mock_ecr
def test_delete_repository_that_doesnt_exist():
client = boto3.client("ecr", region_name="us-east-1")
repo_name = "repo-that-doesnt-exist"
error_msg = re.compile(
r".*The repository with name 'repo-that-doesnt-exist' does not exist in the registry with id '123'.*",
re.MULTILINE,
# when
with pytest.raises(ClientError) as e:
client.delete_repository(repositoryName=repo_name)
# then
ex = e.value
ex.operation_name.should.equal("DeleteRepository")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
client.delete_repository.when.called_with(
repositoryName="repo-that-doesnt-exist", registryId="123"
).should.throw(ClientError, error_msg)
@mock_ecr
def test_delete_repository_error_not_empty():
client = boto3.client("ecr", region_name="us-east-1")
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
client.put_image(
repositoryName=repo_name,
imageManifest=json.dumps(_create_image_manifest()),
imageTag="latest",
)
# when
with pytest.raises(ClientError) as e:
client.delete_repository(repositoryName=repo_name)
# then
ex = e.value
ex.operation_name.should.equal("DeleteRepository")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotEmptyException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' "
f"in registry with id '{ACCOUNT_ID}' "
"cannot be deleted because it still contains images"
)
@mock_ecr
@ -592,7 +742,7 @@ def test_describe_images_by_digest():
)
len(response["imageDetails"]).should.be(1)
image_detail = response["imageDetails"][0]
image_detail["registryId"].should.equal("012345678910")
image_detail["registryId"].should.equal(ACCOUNT_ID)
image_detail["repositoryName"].should.equal("test_repository")
image_detail["imageTags"].should.equal([put_response["imageId"]["imageTag"]])
image_detail["imageDigest"].should.equal(digest)
@ -608,8 +758,8 @@ def test_get_authorization_token_assume_region():
auth_token_response["authorizationData"].should.equal(
[
{
"authorizationToken": "QVdTOjAxMjM0NTY3ODkxMC1hdXRoLXRva2Vu",
"proxyEndpoint": "https://012345678910.dkr.ecr.us-east-1.amazonaws.com",
"authorizationToken": "QVdTOjEyMzQ1Njc4OTAxMi1hdXRoLXRva2Vu",
"proxyEndpoint": f"https://{ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com",
"expiresAt": datetime(2015, 1, 1, tzinfo=tzlocal()),
}
]
@ -674,7 +824,7 @@ def test_batch_get_image():
response["images"][0]["imageManifest"].should.contain(
"vnd.docker.distribution.manifest.v2+json"
)
response["images"][0]["registryId"].should.equal("012345678910")
response["images"][0]["registryId"].should.equal(ACCOUNT_ID)
response["images"][0]["repositoryName"].should.equal("test_repository")
response["images"][0]["imageId"]["imageTag"].should.equal("v2")
@ -1036,3 +1186,139 @@ def test_batch_delete_image_with_mismatched_digest_and_tag():
batch_delete_response["failures"][0]["failureReason"].should.equal(
"Requested image not found"
)
@mock_ecr
def test_list_tags_for_resource():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
arn = client.create_repository(
repositoryName=repo_name, tags=[{"Key": "key-1", "Value": "value-1"}],
)["repository"]["repositoryArn"]
# when
tags = client.list_tags_for_resource(resourceArn=arn)["tags"]
# then
tags.should.equal([{"Key": "key-1", "Value": "value-1"}])
@mock_ecr
def test_list_tags_for_resource_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
# when
with pytest.raises(ClientError) as e:
client.list_tags_for_resource(
resourceArn=f"arn:aws:ecr:{region_name}:{ACCOUNT_ID}:repository/{repo_name}"
)
# then
ex = e.value
ex.operation_name.should.equal("ListTagsForResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_tag_resource():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
arn = client.create_repository(
repositoryName=repo_name, tags=[{"Key": "key-1", "Value": "value-1"}],
)["repository"]["repositoryArn"]
# when
client.tag_resource(resourceArn=arn, tags=[{"Key": "key-2", "Value": "value-2"}])
# then
tags = client.list_tags_for_resource(resourceArn=arn)["tags"]
sorted(tags, key=lambda i: i["Key"]).should.equal(
sorted(
[
{"Key": "key-1", "Value": "value-1"},
{"Key": "key-2", "Value": "value-2"},
],
key=lambda i: i["Key"],
)
)
@mock_ecr
def test_tag_resource_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
# when
with pytest.raises(ClientError) as e:
client.tag_resource(
resourceArn=f"arn:aws:ecr:{region_name}:{ACCOUNT_ID}:repository/{repo_name}",
tags=[{"Key": "key-1", "Value": "value-2"}],
)
# then
ex = e.value
ex.operation_name.should.equal("TagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_untag_resource():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
arn = client.create_repository(
repositoryName=repo_name,
tags=[
{"Key": "key-1", "Value": "value-1"},
{"Key": "key-2", "Value": "value-2"},
],
)["repository"]["repositoryArn"]
# when
client.untag_resource(resourceArn=arn, tagKeys=["key-1"])
# then
tags = client.list_tags_for_resource(resourceArn=arn)["tags"]
tags.should.equal([{"Key": "key-2", "Value": "value-2"}])
@mock_ecr
def test_untag_resource_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
# when
with pytest.raises(ClientError) as e:
client.untag_resource(
resourceArn=f"arn:aws:ecr:{region_name}:{ACCOUNT_ID}:repository/{repo_name}",
tagKeys=["key-1"],
)
# then
ex = e.value
ex.operation_name.should.equal("UntagResource")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)

View File

@ -0,0 +1,103 @@
import copy
from string import Template
import boto3
import json
from moto import mock_cloudformation, mock_ecr
import sure # noqa
from moto.core import ACCOUNT_ID
repo_template = Template(
json.dumps(
{
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "ECR Repo Test",
"Resources": {
"Repo": {
"Type": "AWS::ECR::Repository",
"Properties": {"RepositoryName": "${repo_name}",},
}
},
"Outputs": {
"Arn": {
"Description": "Repo Arn",
"Value": {"Fn::GetAtt": ["Repo", "Arn"]},
}
},
}
)
)
@mock_ecr
@mock_cloudformation
def test_create_repository():
# given
cfn_client = boto3.client("cloudformation", region_name="eu-central-1")
name = "test-repo"
stack_name = "test-stack"
template = repo_template.substitute({"repo_name": name})
# when
cfn_client.create_stack(StackName=stack_name, TemplateBody=template)
# then
repo_arn = f"arn:aws:ecr:eu-central-1:{ACCOUNT_ID}:repository/{name}"
stack = cfn_client.describe_stacks(StackName=stack_name)["Stacks"][0]
stack["Outputs"][0]["OutputValue"].should.equal(repo_arn)
ecr_client = boto3.client("ecr", region_name="eu-central-1")
response = ecr_client.describe_repositories(repositoryNames=[name])
response["repositories"][0]["repositoryArn"].should.equal(repo_arn)
@mock_ecr
@mock_cloudformation
def test_update_repository():
# given
cfn_client = boto3.client("cloudformation", region_name="eu-central-1")
name = "test-repo"
stack_name = "test-stack"
template = repo_template.substitute({"repo_name": name})
cfn_client.create_stack(StackName=stack_name, TemplateBody=template)
template_update = copy.deepcopy(json.loads(template))
template_update["Resources"]["Repo"]["Properties"][
"ImageTagMutability"
] = "IMMUTABLE"
# when
cfn_client.update_stack(
StackName=stack_name, TemplateBody=json.dumps(template_update)
)
# then
ecr_client = boto3.client("ecr", region_name="eu-central-1")
response = ecr_client.describe_repositories(repositoryNames=[name])
repo = response["repositories"][0]
repo["repositoryArn"].should.equal(
f"arn:aws:ecr:eu-central-1:{ACCOUNT_ID}:repository/{name}"
)
repo["imageTagMutability"].should.equal("IMMUTABLE")
@mock_ecr
@mock_cloudformation
def test_delete_repository():
# given
cfn_client = boto3.client("cloudformation", region_name="eu-central-1")
name = "test-repo"
stack_name = "test-stack"
template = repo_template.substitute({"repo_name": name})
cfn_client.create_stack(StackName=stack_name, TemplateBody=template)
# when
cfn_client.delete_stack(StackName=stack_name)
# then
ecr_client = boto3.client("ecr", region_name="eu-central-1")
response = ecr_client.describe_repositories()["repositories"]
response.should.have.length_of(0)

View File

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import json
from datetime import datetime
from dateutil.tz import tzutc
import base64
@ -12,6 +14,7 @@ from freezegun import freeze_time
import pytest
from moto import mock_kms
from moto.core import ACCOUNT_ID
PLAINTEXT_VECTORS = [
b"some encodeable plaintext",
@ -118,6 +121,33 @@ def test_describe_key():
response["KeyMetadata"].should_not.have.key("SigningAlgorithms")
@mock_kms
def test_get_key_policy_default():
# given
client = boto3.client("kms", region_name="us-east-1")
key_id = client.create_key()["KeyMetadata"]["KeyId"]
# when
policy = client.get_key_policy(KeyId=key_id, PolicyName="default")["Policy"]
# then
json.loads(policy).should.equal(
{
"Version": "2012-10-17",
"Id": "key-default-1",
"Statement": [
{
"Sid": "Enable IAM User Permissions",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": "kms:*",
"Resource": "*",
}
],
}
)
@pytest.mark.parametrize(
"key_id",
[