Add ecr repo policy (#4148)

* Add ecr.set_repository_policy

* Add ecr.get_repository_policy

* Add ecr.delete_repository_policy
This commit is contained in:
Anton Grübel 2021-08-07 16:48:28 +09:00 committed by GitHub
parent f096b0e717
commit b4ae6a9cce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 351 additions and 10 deletions

View File

@ -42,6 +42,20 @@ class RepositoryNotFoundException(JsonRESTError):
)
class RepositoryPolicyNotFoundException(JsonRESTError):
code = 400
def __init__(self, repository_name, registry_id):
super().__init__(
error_type=__class__.__name__,
message=(
"Repository policy does not exist "
f"for the repository with name '{repository_name}' "
f"in the registry with id '{registry_id}'"
),
)
class ImageNotFoundException(JsonRESTError):
code = 400

View File

@ -18,7 +18,10 @@ from moto.ecr.exceptions import (
RepositoryAlreadyExistsException,
RepositoryNotEmptyException,
InvalidParameterException,
RepositoryPolicyNotFoundException,
)
from moto.iam.exceptions import MalformedPolicyDocument
from moto.iam.policy_validation import IAMPolicyDocumentValidator
from moto.utilities.tagging_service import TaggingService
DEFAULT_REGISTRY_ID = ACCOUNT_ID
@ -77,6 +80,7 @@ class Repository(BaseObject, CloudFormationModel):
self.encryption_configuration = self._determine_encryption_config(
encryption_config
)
self.policy = None
self.images = []
def _determine_encryption_config(self, encryption_config):
@ -658,6 +662,57 @@ class ECRBackend(BaseBackend):
"imageScanningConfiguration": repo.image_scanning_configuration,
}
def set_repository_policy(self, registry_id, repository_name, policy_text):
repo = self._get_repository(repository_name, registry_id)
try:
iam_policy_document_validator = IAMPolicyDocumentValidator(policy_text)
# the repository policy can be defined without a resource field
iam_policy_document_validator._validate_resource_exist = lambda: None
# the repository policy can have the old version 2008-10-17
iam_policy_document_validator._validate_version = lambda: None
iam_policy_document_validator.validate()
except MalformedPolicyDocument:
raise InvalidParameterException(
"Invalid parameter at 'PolicyText' failed to satisfy constraint: "
"'Invalid repository policy provided'"
)
repo.policy = policy_text
return {
"registryId": repo.registry_id,
"repositoryName": repository_name,
"policyText": repo.policy,
}
def get_repository_policy(self, registry_id, repository_name):
repo = self._get_repository(repository_name, registry_id)
if not repo.policy:
raise RepositoryPolicyNotFoundException(repository_name, repo.registry_id)
return {
"registryId": repo.registry_id,
"repositoryName": repository_name,
"policyText": repo.policy,
}
def delete_repository_policy(self, registry_id, repository_name):
repo = self._get_repository(repository_name, registry_id)
policy = repo.policy
if not policy:
raise RepositoryPolicyNotFoundException(repository_name, repo.registry_id)
repo.policy = None
return {
"registryId": repo.registry_id,
"repositoryName": repository_name,
"policyText": policy,
}
ecr_backends = {}
for region, ec2_backend in ec2_backends.items():

View File

@ -119,10 +119,14 @@ class ECRResponse(BaseResponse):
)
def delete_repository_policy(self):
if self.is_not_dryrun("DeleteRepositoryPolicy"):
raise NotImplementedError(
"ECR.delete_repository_policy is not yet implemented"
registry_id = self._get_param("registryId")
repository_name = self._get_param("repositoryName")
return json.dumps(
self.ecr_backend.delete_repository_policy(
registry_id=registry_id, repository_name=repository_name,
)
)
def generate_presigned_url(self):
if self.is_not_dryrun("GeneratePresignedUrl"):
@ -160,10 +164,14 @@ class ECRResponse(BaseResponse):
raise NotImplementedError("ECR.get_paginator is not yet implemented")
def get_repository_policy(self):
if self.is_not_dryrun("GetRepositoryPolicy"):
raise NotImplementedError(
"ECR.get_repository_policy is not yet implemented"
registry_id = self._get_param("registryId")
repository_name = self._get_param("repositoryName")
return json.dumps(
self.ecr_backend.get_repository_policy(
registry_id=registry_id, repository_name=repository_name,
)
)
def get_waiter(self):
if self.is_not_dryrun("GetWaiter"):
@ -176,10 +184,20 @@ class ECRResponse(BaseResponse):
)
def set_repository_policy(self):
if self.is_not_dryrun("SetRepositoryPolicy"):
raise NotImplementedError(
"ECR.set_repository_policy is not yet implemented"
registry_id = self._get_param("registryId")
repository_name = self._get_param("repositoryName")
policy_text = self._get_param("policyText")
# this is usually a safety flag to prevent accidental repository lock outs
# but this would need a much deeper validation of the provided policy
# force = self._get_param("force")
return json.dumps(
self.ecr_backend.set_repository_policy(
registry_id=registry_id,
repository_name=repository_name,
policy_text=policy_text,
)
)
def upload_layer_part(self):
if self.is_not_dryrun("UploadLayerPart"):

View File

@ -15,6 +15,8 @@ VALID_STATEMENT_ELEMENTS = [
"Resource",
"NotResource",
"Effect",
"Principal",
"NotPrincipal",
"Condition",
]

View File

@ -2,6 +2,5 @@ TestAccAWSEc2TransitGatewayDxGatewayAttachmentDataSource
TestAccAWSEc2TransitGatewayPeeringAttachmentAccepter
TestAccAWSEc2TransitGatewayRouteTableAssociation
TestAccAWSEc2TransitGatewayVpcAttachment
TestAccAWSEcrRepositoryPolicy
TestAccAWSFms
TestAccAWSIAMRolePolicy

View File

@ -45,6 +45,7 @@ TestAccAWSEc2TransitGatewayPeeringAttachment
TestAccAWSEc2TransitGatewayPeeringAttachmentDataSource
TestAccAWSEcrRepository
TestAccAWSEcrRepositoryDataSource
TestAccAWSEcrRepositoryPolicy
TestAccAWSElasticBeanstalkSolutionStackDataSource
TestAccAWSElbHostedZoneId
TestAccAWSElbServiceAccount

View File

@ -1468,3 +1468,255 @@ def test_put_image_scanning_configuration_error_not_exists():
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_set_repository_policy():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "root",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": ["ecr:DescribeImages"],
}
],
}
# when
response = client.set_repository_policy(
repositoryName=repo_name, policyText=json.dumps(policy),
)
# then
response["registryId"].should.equal(ACCOUNT_ID)
response["repositoryName"].should.equal(repo_name)
json.loads(response["policyText"]).should.equal(policy)
@mock_ecr
def test_set_repository_policy_error_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "root",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": ["ecr:DescribeImages"],
}
],
}
# when
with pytest.raises(ClientError) as e:
client.set_repository_policy(
repositoryName=repo_name, policyText=json.dumps(policy),
)
# then
ex = e.value
ex.operation_name.should.equal("SetRepositoryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_set_repository_policy_error_invalid_param():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
policy = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Allow"}],
}
# when
with pytest.raises(ClientError) as e:
client.set_repository_policy(
repositoryName=repo_name, policyText=json.dumps(policy),
)
# then
ex = e.value
ex.operation_name.should.equal("SetRepositoryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("InvalidParameterException")
ex.response["Error"]["Message"].should.equal(
"Invalid parameter at 'PolicyText' failed to satisfy constraint: "
"'Invalid repository policy provided'"
)
@mock_ecr
def test_get_repository_policy():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "root",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": ["ecr:DescribeImages"],
}
],
}
client.set_repository_policy(
repositoryName=repo_name, policyText=json.dumps(policy),
)
# when
response = client.get_repository_policy(repositoryName=repo_name)
# then
response["registryId"].should.equal(ACCOUNT_ID)
response["repositoryName"].should.equal(repo_name)
json.loads(response["policyText"]).should.equal(policy)
@mock_ecr
def test_get_repository_policy_error_repo_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
# when
with pytest.raises(ClientError) as e:
client.get_repository_policy(repositoryName=repo_name)
# then
ex = e.value
ex.operation_name.should.equal("GetRepositoryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_get_repository_policy_error_policy_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
# when
with pytest.raises(ClientError) as e:
client.get_repository_policy(repositoryName=repo_name)
# then
ex = e.value
ex.operation_name.should.equal("GetRepositoryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryPolicyNotFoundException")
ex.response["Error"]["Message"].should.equal(
"Repository policy does not exist "
f"for the repository with name '{repo_name}' "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_delete_repository_policy():
# given
client = boto3.client("ecr", region_name="eu-central-1")
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "root",
"Effect": "Allow",
"Principal": {"AWS": f"arn:aws:iam::{ACCOUNT_ID}:root"},
"Action": ["ecr:DescribeImages"],
}
],
}
client.set_repository_policy(
repositoryName=repo_name, policyText=json.dumps(policy),
)
# when
response = client.delete_repository_policy(repositoryName=repo_name)
# then
response["registryId"].should.equal(ACCOUNT_ID)
response["repositoryName"].should.equal(repo_name)
json.loads(response["policyText"]).should.equal(policy)
with pytest.raises(ClientError) as e:
client.get_repository_policy(repositoryName=repo_name)
e.value.response["Error"]["Code"].should.contain(
"RepositoryPolicyNotFoundException"
)
@mock_ecr
def test_delete_repository_policy_error_repo_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "not-exists"
# when
with pytest.raises(ClientError) as e:
client.delete_repository_policy(repositoryName=repo_name)
# then
ex = e.value
ex.operation_name.should.equal("DeleteRepositoryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryNotFoundException")
ex.response["Error"]["Message"].should.equal(
f"The repository with name '{repo_name}' does not exist "
f"in the registry with id '{ACCOUNT_ID}'"
)
@mock_ecr
def test_delete_repository_policy_error_policy_not_exists():
# given
region_name = "eu-central-1"
client = boto3.client("ecr", region_name=region_name)
repo_name = "test-repo"
client.create_repository(repositoryName=repo_name)
# when
with pytest.raises(ClientError) as e:
client.delete_repository_policy(repositoryName=repo_name)
# then
ex = e.value
ex.operation_name.should.equal("DeleteRepositoryPolicy")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain("RepositoryPolicyNotFoundException")
ex.response["Error"]["Message"].should.equal(
"Repository policy does not exist "
f"for the repository with name '{repo_name}' "
f"in the registry with id '{ACCOUNT_ID}'"
)