diff --git a/moto/ecr/exceptions.py b/moto/ecr/exceptions.py index 12303b9b5..b1086fee6 100644 --- a/moto/ecr/exceptions.py +++ b/moto/ecr/exceptions.py @@ -16,6 +16,18 @@ class LifecyclePolicyNotFoundException(JsonRESTError): ) +class RegistryPolicyNotFoundException(JsonRESTError): + code = 400 + + def __init__(self, registry_id): + super().__init__( + error_type=__class__.__name__, + message=( + f"Registry policy does not exist in the registry with id '{registry_id}'" + ), + ) + + class RepositoryAlreadyExistsException(JsonRESTError): code = 400 diff --git a/moto/ecr/models.py b/moto/ecr/models.py index fc2ac82c1..78575e5ec 100644 --- a/moto/ecr/models.py +++ b/moto/ecr/models.py @@ -1,6 +1,7 @@ from __future__ import unicode_literals import hashlib +import json import re import uuid from collections import namedtuple @@ -21,6 +22,7 @@ from moto.ecr.exceptions import ( InvalidParameterException, RepositoryPolicyNotFoundException, LifecyclePolicyNotFoundException, + RegistryPolicyNotFoundException, ) from moto.ecr.policy_validation import EcrLifecyclePolicyValidator from moto.iam.exceptions import MalformedPolicyDocument @@ -294,6 +296,7 @@ class Image(BaseObject): class ECRBackend(BaseBackend): def __init__(self, region_name): self.region_name = region_name + self.registry_policy = None self.repositories: Dict[str, Repository] = {} self.tagger = TaggingService(tagName="tags") @@ -764,6 +767,55 @@ class ECRBackend(BaseBackend): ), } + def _validate_registry_policy_action(self, policy_text): + # only CreateRepository & ReplicateImage actions are allowed + VALID_ACTIONS = {"ecr:CreateRepository", "ecr:ReplicateImage"} + + policy = json.loads(policy_text) + for statement in policy["Statement"]: + if set(statement["Action"]) - VALID_ACTIONS: + raise MalformedPolicyDocument() + + def put_registry_policy(self, policy_text): + try: + iam_policy_document_validator = IAMPolicyDocumentValidator(policy_text) + iam_policy_document_validator.validate() + + self._validate_registry_policy_action(policy_text) + except MalformedPolicyDocument: + raise InvalidParameterException( + "Invalid parameter at 'PolicyText' failed to satisfy constraint: " + "'Invalid registry policy provided'" + ) + + self.registry_policy = policy_text + + return { + "registryId": ACCOUNT_ID, + "policyText": policy_text, + } + + def get_registry_policy(self): + if not self.registry_policy: + raise RegistryPolicyNotFoundException(ACCOUNT_ID) + + return { + "registryId": ACCOUNT_ID, + "policyText": self.registry_policy, + } + + def delete_registry_policy(self): + policy = self.registry_policy + if not policy: + raise RegistryPolicyNotFoundException(ACCOUNT_ID) + + self.registry_policy = None + + return { + "registryId": ACCOUNT_ID, + "policyText": policy, + } + ecr_backends = {} for region, ec2_backend in ec2_backends.items(): diff --git a/moto/ecr/responses.py b/moto/ecr/responses.py index 8fbb373d2..2f070afa6 100644 --- a/moto/ecr/responses.py +++ b/moto/ecr/responses.py @@ -108,10 +108,6 @@ class ECRResponse(BaseResponse): ) return json.dumps(response) - def can_paginate(self): - if self.is_not_dryrun("CanPaginate"): - raise NotImplementedError("ECR.can_paginate is not yet implemented") - def complete_layer_upload(self): if self.is_not_dryrun("CompleteLayerUpload"): raise NotImplementedError( @@ -128,12 +124,6 @@ class ECRResponse(BaseResponse): ) ) - def generate_presigned_url(self): - if self.is_not_dryrun("GeneratePresignedUrl"): - raise NotImplementedError( - "ECR.generate_presigned_url is not yet implemented" - ) - def get_authorization_token(self): registry_ids = self._get_param("registryIds") if not registry_ids: @@ -159,10 +149,6 @@ class ECRResponse(BaseResponse): "ECR.get_download_url_for_layer is not yet implemented" ) - def get_paginator(self): - if self.is_not_dryrun("GetPaginator"): - raise NotImplementedError("ECR.get_paginator is not yet implemented") - def get_repository_policy(self): registry_id = self._get_param("registryId") repository_name = self._get_param("repositoryName") @@ -173,10 +159,6 @@ class ECRResponse(BaseResponse): ) ) - def get_waiter(self): - if self.is_not_dryrun("GetWaiter"): - raise NotImplementedError("ECR.get_waiter is not yet implemented") - def initiate_layer_upload(self): if self.is_not_dryrun("InitiateLayerUpload"): raise NotImplementedError( @@ -278,3 +260,14 @@ class ECRResponse(BaseResponse): registry_id=registry_id, repository_name=repository_name, ) ) + + def put_registry_policy(self): + policy_text = self._get_param("policyText") + + return json.dumps(self.ecr_backend.put_registry_policy(policy_text=policy_text)) + + def get_registry_policy(self): + return json.dumps(self.ecr_backend.get_registry_policy()) + + def delete_registry_policy(self): + return json.dumps(self.ecr_backend.delete_registry_policy()) diff --git a/tests/terraform-tests.success.txt b/tests/terraform-tests.success.txt index 142342f9b..3b2784adc 100644 --- a/tests/terraform-tests.success.txt +++ b/tests/terraform-tests.success.txt @@ -44,6 +44,7 @@ TestAccAWSEc2TransitGatewayVpnAttachmentDataSource TestAccAWSEc2TransitGatewayPeeringAttachment TestAccAWSEc2TransitGatewayPeeringAttachmentDataSource TestAccAWSEcrLifecyclePolicy +TestAccAWSEcrRegistryPolicy TestAccAWSEcrRepository TestAccAWSEcrRepositoryDataSource TestAccAWSEcrRepositoryPolicy diff --git a/tests/test_ecr/test_ecr_boto3.py b/tests/test_ecr/test_ecr_boto3.py index 86a29cb0a..501ea2e70 100644 --- a/tests/test_ecr/test_ecr_boto3.py +++ b/tests/test_ecr/test_ecr_boto3.py @@ -1955,3 +1955,161 @@ def test_delete_lifecycle_policy_error_policy_not_exists(): f"for the repository with name '{repo_name}' " f"in the registry with id '{ACCOUNT_ID}'" ) + + +@mock_ecr +def test_put_registry_policy(): + # given + client = boto3.client("ecr", region_name="eu-central-1") + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": ["arn:aws:iam::111111111111:root", "222222222222"] + }, + "Action": ["ecr:CreateRepository", "ecr:ReplicateImage"], + "Resource": "*", + } + ], + } + + # when + response = client.put_registry_policy(policyText=json.dumps(policy)) + + # then + response["registryId"].should.equal(ACCOUNT_ID) + json.loads(response["policyText"]).should.equal(policy) + + +@mock_ecr +def test_put_registry_policy_error_invalid_action(): + # given + client = boto3.client("ecr", region_name="eu-central-1") + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": "arn:aws:iam::111111111111:root"}, + "Action": [ + "ecr:CreateRepository", + "ecr:ReplicateImage", + "ecr:DescribeRepositories", + ], + "Resource": "*", + } + ], + } + + # when + with pytest.raises(ClientError) as e: + client.put_registry_policy(policyText=json.dumps(policy)) + + # then + ex = e.value + ex.operation_name.should.equal("PutRegistryPolicy") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("InvalidParameterException") + ex.response["Error"]["Message"].should.equal( + "Invalid parameter at 'PolicyText' failed to satisfy constraint: " + "'Invalid registry policy provided'" + ) + + +@mock_ecr +def test_get_registry_policy(): + # given + client = boto3.client("ecr", region_name="eu-central-1") + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": ["arn:aws:iam::111111111111:root", "222222222222"] + }, + "Action": ["ecr:CreateRepository", "ecr:ReplicateImage"], + "Resource": "*", + } + ], + } + client.put_registry_policy(policyText=json.dumps(policy)) + + # when + response = client.get_registry_policy() + + # then + response["registryId"].should.equal(ACCOUNT_ID) + json.loads(response["policyText"]).should.equal(policy) + + +@mock_ecr +def test_get_registry_policy_error_policy_not_exists(): + # given + client = boto3.client("ecr", region_name="eu-central-1") + + # when + with pytest.raises(ClientError) as e: + client.get_registry_policy() + + # then + ex = e.value + ex.operation_name.should.equal("GetRegistryPolicy") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("RegistryPolicyNotFoundException") + ex.response["Error"]["Message"].should.equal( + f"Registry policy does not exist in the registry with id '{ACCOUNT_ID}'" + ) + + +@mock_ecr +def test_delete_registry_policy(): + # given + client = boto3.client("ecr", region_name="eu-central-1") + policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "AWS": ["arn:aws:iam::111111111111:root", "222222222222"] + }, + "Action": ["ecr:CreateRepository", "ecr:ReplicateImage"], + "Resource": "*", + } + ], + } + client.put_registry_policy(policyText=json.dumps(policy)) + + # when + response = client.delete_registry_policy() + + # then + response["registryId"].should.equal(ACCOUNT_ID) + json.loads(response["policyText"]).should.equal(policy) + + with pytest.raises(ClientError) as e: + client.get_registry_policy() + + e.value.response["Error"]["Code"].should.contain("RegistryPolicyNotFoundException") + + +@mock_ecr +def test_delete_registry_policy_error_policy_not_exists(): + # given + client = boto3.client("ecr", region_name="eu-central-1") + + # when + with pytest.raises(ClientError) as e: + client.delete_registry_policy() + + # then + ex = e.value + ex.operation_name.should.equal("DeleteRegistryPolicy") + ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) + ex.response["Error"]["Code"].should.contain("RegistryPolicyNotFoundException") + ex.response["Error"]["Message"].should.equal( + f"Registry policy does not exist in the registry with id '{ACCOUNT_ID}'" + )