Add ecr registry policy (#4159)

* Add ecr.put_registry_policy

* Add ecr.get_registry_policy

* Add ecr.delete_registry_policy

* Add ecr registry policy test for Terraform and cleanup
This commit is contained in:
Anton Grübel 2021-08-11 21:18:12 +09:00 committed by GitHub
parent 5e6b7ee529
commit 6f361e6afb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 234 additions and 18 deletions

View File

@ -16,6 +16,18 @@ class LifecyclePolicyNotFoundException(JsonRESTError):
)
class RegistryPolicyNotFoundException(JsonRESTError):
code = 400
def __init__(self, registry_id):
super().__init__(
error_type=__class__.__name__,
message=(
f"Registry policy does not exist in the registry with id '{registry_id}'"
),
)
class RepositoryAlreadyExistsException(JsonRESTError):
code = 400

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import hashlib
import json
import re
import uuid
from collections import namedtuple
@ -21,6 +22,7 @@ from moto.ecr.exceptions import (
InvalidParameterException,
RepositoryPolicyNotFoundException,
LifecyclePolicyNotFoundException,
RegistryPolicyNotFoundException,
)
from moto.ecr.policy_validation import EcrLifecyclePolicyValidator
from moto.iam.exceptions import MalformedPolicyDocument
@ -294,6 +296,7 @@ class Image(BaseObject):
class ECRBackend(BaseBackend):
def __init__(self, region_name):
self.region_name = region_name
self.registry_policy = None
self.repositories: Dict[str, Repository] = {}
self.tagger = TaggingService(tagName="tags")
@ -764,6 +767,55 @@ class ECRBackend(BaseBackend):
),
}
def _validate_registry_policy_action(self, policy_text):
# only CreateRepository & ReplicateImage actions are allowed
VALID_ACTIONS = {"ecr:CreateRepository", "ecr:ReplicateImage"}
policy = json.loads(policy_text)
for statement in policy["Statement"]:
if set(statement["Action"]) - VALID_ACTIONS:
raise MalformedPolicyDocument()
def put_registry_policy(self, policy_text):
try:
iam_policy_document_validator = IAMPolicyDocumentValidator(policy_text)
iam_policy_document_validator.validate()
self._validate_registry_policy_action(policy_text)
except MalformedPolicyDocument:
raise InvalidParameterException(
"Invalid parameter at 'PolicyText' failed to satisfy constraint: "
"'Invalid registry policy provided'"
)
self.registry_policy = policy_text
return {
"registryId": ACCOUNT_ID,
"policyText": policy_text,
}
def get_registry_policy(self):
if not self.registry_policy:
raise RegistryPolicyNotFoundException(ACCOUNT_ID)
return {
"registryId": ACCOUNT_ID,
"policyText": self.registry_policy,
}
def delete_registry_policy(self):
policy = self.registry_policy
if not policy:
raise RegistryPolicyNotFoundException(ACCOUNT_ID)
self.registry_policy = None
return {
"registryId": ACCOUNT_ID,
"policyText": policy,
}
ecr_backends = {}
for region, ec2_backend in ec2_backends.items():

View File

@ -108,10 +108,6 @@ class ECRResponse(BaseResponse):
)
return json.dumps(response)
def can_paginate(self):
if self.is_not_dryrun("CanPaginate"):
raise NotImplementedError("ECR.can_paginate is not yet implemented")
def complete_layer_upload(self):
if self.is_not_dryrun("CompleteLayerUpload"):
raise NotImplementedError(
@ -128,12 +124,6 @@ class ECRResponse(BaseResponse):
)
)
def generate_presigned_url(self):
if self.is_not_dryrun("GeneratePresignedUrl"):
raise NotImplementedError(
"ECR.generate_presigned_url is not yet implemented"
)
def get_authorization_token(self):
registry_ids = self._get_param("registryIds")
if not registry_ids:
@ -159,10 +149,6 @@ class ECRResponse(BaseResponse):
"ECR.get_download_url_for_layer is not yet implemented"
)
def get_paginator(self):
if self.is_not_dryrun("GetPaginator"):
raise NotImplementedError("ECR.get_paginator is not yet implemented")
def get_repository_policy(self):
registry_id = self._get_param("registryId")
repository_name = self._get_param("repositoryName")
@ -173,10 +159,6 @@ class ECRResponse(BaseResponse):
)
)
def get_waiter(self):
if self.is_not_dryrun("GetWaiter"):
raise NotImplementedError("ECR.get_waiter is not yet implemented")
def initiate_layer_upload(self):
if self.is_not_dryrun("InitiateLayerUpload"):
raise NotImplementedError(
@ -278,3 +260,14 @@ class ECRResponse(BaseResponse):
registry_id=registry_id, repository_name=repository_name,
)
)
def put_registry_policy(self):
policy_text = self._get_param("policyText")
return json.dumps(self.ecr_backend.put_registry_policy(policy_text=policy_text))
def get_registry_policy(self):
return json.dumps(self.ecr_backend.get_registry_policy())
def delete_registry_policy(self):
return json.dumps(self.ecr_backend.delete_registry_policy())

View File

@ -44,6 +44,7 @@ TestAccAWSEc2TransitGatewayVpnAttachmentDataSource
TestAccAWSEc2TransitGatewayPeeringAttachment
TestAccAWSEc2TransitGatewayPeeringAttachmentDataSource
TestAccAWSEcrLifecyclePolicy
TestAccAWSEcrRegistryPolicy
TestAccAWSEcrRepository
TestAccAWSEcrRepositoryDataSource
TestAccAWSEcrRepositoryPolicy

View File

@ -1955,3 +1955,161 @@ def test_delete_lifecycle_policy_error_policy_not_exists():
f"for the repository with name '{repo_name}' "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_put_registry_policy():
# given
client = boto3.client("ecr", region_name="eu-central-1")
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": ["arn:aws:iam::111111111111:root", "222222222222"]
},
"Action": ["ecr:CreateRepository", "ecr:ReplicateImage"],
"Resource": "*",
}
],
}
# when
response = client.put_registry_policy(policyText=json.dumps(policy))
# then
response["registryId"].should.equal(ACCOUNT_ID)
json.loads(response["policyText"]).should.equal(policy)
@mock_ecr
def test_put_registry_policy_error_invalid_action():
# given
client = boto3.client("ecr", region_name="eu-central-1")
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::111111111111:root"},
"Action": [
"ecr:CreateRepository",
"ecr:ReplicateImage",
"ecr:DescribeRepositories",
],
"Resource": "*",
}
],
}
# when
with pytest.raises(ClientError) as e:
client.put_registry_policy(policyText=json.dumps(policy))
# then
ex = e.value
ex.operation_name.should.equal("PutRegistryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Invalid parameter at 'PolicyText' failed to satisfy constraint: "
"'Invalid registry policy provided'"
)
@mock_ecr
def test_get_registry_policy():
# given
client = boto3.client("ecr", region_name="eu-central-1")
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": ["arn:aws:iam::111111111111:root", "222222222222"]
},
"Action": ["ecr:CreateRepository", "ecr:ReplicateImage"],
"Resource": "*",
}
],
}
client.put_registry_policy(policyText=json.dumps(policy))
# when
response = client.get_registry_policy()
# then
response["registryId"].should.equal(ACCOUNT_ID)
json.loads(response["policyText"]).should.equal(policy)
@mock_ecr
def test_get_registry_policy_error_policy_not_exists():
# given
client = boto3.client("ecr", region_name="eu-central-1")
# when
with pytest.raises(ClientError) as e:
client.get_registry_policy()
# then
ex = e.value
ex.operation_name.should.equal("GetRegistryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RegistryPolicyNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"Registry policy does not exist in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_delete_registry_policy():
# given
client = boto3.client("ecr", region_name="eu-central-1")
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": ["arn:aws:iam::111111111111:root", "222222222222"]
},
"Action": ["ecr:CreateRepository", "ecr:ReplicateImage"],
"Resource": "*",
}
],
}
client.put_registry_policy(policyText=json.dumps(policy))
# when
response = client.delete_registry_policy()
# then
response["registryId"].should.equal(ACCOUNT_ID)
json.loads(response["policyText"]).should.equal(policy)
with pytest.raises(ClientError) as e:
client.get_registry_policy()
e.value.response["Error"]["Code"].should.contain("RegistryPolicyNotFoundException")
@mock_ecr
def test_delete_registry_policy_error_policy_not_exists():
# given
client = boto3.client("ecr", region_name="eu-central-1")
# when
with pytest.raises(ClientError) as e:
client.delete_registry_policy()
# then
ex = e.value
ex.operation_name.should.equal("DeleteRegistryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RegistryPolicyNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"Registry policy does not exist in the registry with id '{ACCOUNT_ID}'"
)