2017-05-22 20:52:17 +00:00
import json
2023-11-30 15:55:51 +00:00
from datetime import datetime
from unittest import SkipTest
2017-05-22 20:52:17 +00:00
2023-11-30 15:55:51 +00:00
import boto3
import pytest
2021-01-31 12:21:24 +00:00
from botocore . exceptions import ClientError
2017-07-25 21:54:05 +00:00
from dateutil . tz import tzlocal
2023-07-17 10:21:32 +00:00
from freezegun import freeze_time
2017-05-22 20:52:17 +00:00
2024-01-07 12:03:33 +00:00
from moto import mock_aws , settings
2022-08-13 09:49:43 +00:00
from moto . core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
2021-08-03 15:21:15 +00:00
2023-07-17 10:21:32 +00:00
from . test_ecr_helpers import _create_image_manifest , _create_image_manifest_list
2017-05-22 20:52:17 +00:00
2023-11-07 18:29:35 +00:00
ECR_REGION = " us-east-1 "
ECR_REPO = " test-repo "
ECR_REPO_NOT_EXIST = " does-not-exist "
2017-05-22 20:52:17 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-22 20:52:17 +00:00
def test_create_repository ( ) :
2021-08-03 15:21:15 +00:00
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-03 15:21:15 +00:00
# when
2023-11-07 18:29:35 +00:00
response = client . create_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
# then
repo = response [ " repository " ]
2023-11-07 18:29:35 +00:00
assert repo [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert (
repo [ " repositoryArn " ]
2023-11-07 18:29:35 +00:00
== f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/ { ECR_REPO } "
2021-08-03 15:21:15 +00:00
)
2023-07-17 10:21:32 +00:00
assert repo [ " registryId " ] == ACCOUNT_ID
assert (
repo [ " repositoryUri " ]
2023-11-07 18:29:35 +00:00
== f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/ { ECR_REPO } "
2021-08-03 15:21:15 +00:00
)
2023-07-17 10:21:32 +00:00
assert isinstance ( repo [ " createdAt " ] , datetime )
assert repo [ " imageTagMutability " ] == " MUTABLE "
assert repo [ " imageScanningConfiguration " ] == { " scanOnPush " : False }
assert repo [ " encryptionConfiguration " ] == { " encryptionType " : " AES256 " }
2021-08-03 15:21:15 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_create_repository_with_non_default_config ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
kms_key = f " arn:aws:kms: { region_name } : { ACCOUNT_ID } :key/51d81fab-b138-4bd2-8a09-07fd6d37224d "
# when
response = client . create_repository (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-03 15:21:15 +00:00
imageTagMutability = " IMMUTABLE " ,
imageScanningConfiguration = { " scanOnPush " : True } ,
encryptionConfiguration = { " encryptionType " : " KMS " , " kmsKey " : kms_key } ,
tags = [ { " Key " : " key-1 " , " Value " : " value-1 " } ] ,
)
# then
repo = response [ " repository " ]
2023-11-07 18:29:35 +00:00
assert repo [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert (
repo [ " repositoryArn " ]
2023-11-07 18:29:35 +00:00
== f " arn:aws:ecr: { region_name } : { ACCOUNT_ID } :repository/ { ECR_REPO } "
2023-07-17 10:21:32 +00:00
)
assert repo [ " registryId " ] == ACCOUNT_ID
assert (
repo [ " repositoryUri " ]
2023-11-07 18:29:35 +00:00
== f " { ACCOUNT_ID } .dkr.ecr. { region_name } .amazonaws.com/ { ECR_REPO } "
2023-07-17 10:21:32 +00:00
)
assert isinstance ( repo [ " createdAt " ] , datetime )
assert repo [ " imageTagMutability " ] == " IMMUTABLE "
assert repo [ " imageScanningConfiguration " ] == { " scanOnPush " : True }
assert repo [ " encryptionConfiguration " ] == {
" encryptionType " : " KMS " ,
" kmsKey " : kms_key ,
}
2021-08-03 15:21:15 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2022-01-06 22:05:23 +00:00
def test_create_repository_in_different_account ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2022-01-06 22:05:23 +00:00
# when passing in a custom registry ID
response = client . create_repository (
2023-11-07 18:29:35 +00:00
registryId = " 222222222222 " , repositoryName = ECR_REPO
2022-01-06 22:05:23 +00:00
)
# then we should persist this ID
repo = response [ " repository " ]
2023-07-17 10:21:32 +00:00
assert repo [ " registryId " ] == " 222222222222 "
assert (
repo [ " repositoryArn " ]
== " arn:aws:ecr:us-east-1:222222222222:repository/test-repo "
2022-01-06 22:05:23 +00:00
)
# then this repo should be returned with the correct ID
repo = client . describe_repositories ( ) [ " repositories " ] [ 0 ]
2023-07-17 10:21:32 +00:00
assert repo [ " registryId " ] == " 222222222222 "
2022-01-06 22:05:23 +00:00
# then we can search for repos with this ID
response = client . describe_repositories ( registryId = " 222222222222 " )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 1
2022-01-06 22:05:23 +00:00
# then this repo is not found when searching for a different ID
response = client . describe_repositories ( registryId = ACCOUNT_ID )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 0
2022-01-06 22:05:23 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_create_repository_with_aws_managed_kms ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
repo = client . create_repository (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , encryptionConfiguration = { " encryptionType " : " KMS " }
2021-08-03 15:21:15 +00:00
) [ " repository " ]
# then
2023-11-07 18:29:35 +00:00
assert repo [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert repo [ " encryptionConfiguration " ] [ " encryptionType " ] == " KMS "
assert repo [ " encryptionConfiguration " ] [ " kmsKey " ] . startswith (
f " arn:aws:kms:eu-central-1: { ACCOUNT_ID } :key/ "
2021-08-03 15:21:15 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_create_repository_error_already_exists ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " CreateRepository "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryAlreadyExistsException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO } ' already exists in the registry with id ' { ACCOUNT_ID } ' "
2017-05-22 20:52:17 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2023-07-04 15:49:57 +00:00
def test_create_repository_error_name_validation ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2023-07-04 15:49:57 +00:00
repo_name = " tesT "
with pytest . raises ( ClientError ) as e :
client . create_repository ( repositoryName = repo_name )
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " CreateRepository "
assert ex . response [ " Error " ] [ " Code " ] == " InvalidParameterException "
2023-07-04 15:49:57 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-22 20:52:17 +00:00
def test_describe_repositories ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-05-22 20:52:17 +00:00
_ = client . create_repository ( repositoryName = " test_repository1 " )
_ = client . create_repository ( repositoryName = " test_repository0 " )
response = client . describe_repositories ( )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 2
2019-10-31 15:44:26 +00:00
2023-07-17 10:21:32 +00:00
repository_arns = {
2021-08-03 15:21:15 +00:00
f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/test_repository1 " ,
f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/test_repository0 " ,
2023-07-17 10:21:32 +00:00
}
assert {
response [ " repositories " ] [ 0 ] [ " repositoryArn " ] ,
response [ " repositories " ] [ 1 ] [ " repositoryArn " ] ,
} == repository_arns
2019-10-31 15:44:26 +00:00
2023-07-17 10:21:32 +00:00
repository_uris = {
2021-08-03 15:21:15 +00:00
f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/test_repository1 " ,
f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/test_repository0 " ,
2023-07-17 10:21:32 +00:00
}
assert {
response [ " repositories " ] [ 0 ] [ " repositoryUri " ] ,
response [ " repositories " ] [ 1 ] [ " repositoryUri " ] ,
} == repository_uris
2017-05-22 20:52:17 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-23 02:57:14 +00:00
def test_describe_repositories_1 ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-08-10 23:33:38 +00:00
_ = client . create_repository ( repositoryName = " test_repository1 " )
2017-05-23 02:57:14 +00:00
_ = client . create_repository ( repositoryName = " test_repository0 " )
2021-08-03 15:21:15 +00:00
response = client . describe_repositories ( registryId = ACCOUNT_ID )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 2
2019-10-31 15:44:26 +00:00
2023-07-17 10:21:32 +00:00
repository_arns = {
2021-08-03 15:21:15 +00:00
f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/test_repository1 " ,
f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/test_repository0 " ,
2023-07-17 10:21:32 +00:00
}
assert {
response [ " repositories " ] [ 0 ] [ " repositoryArn " ] ,
response [ " repositories " ] [ 1 ] [ " repositoryArn " ] ,
} == repository_arns
2019-10-31 15:44:26 +00:00
2023-07-17 10:21:32 +00:00
repository_uris = {
2021-08-03 15:21:15 +00:00
f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/test_repository1 " ,
f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/test_repository0 " ,
2023-07-17 10:21:32 +00:00
}
assert {
response [ " repositories " ] [ 0 ] [ " repositoryUri " ] ,
response [ " repositories " ] [ 1 ] [ " repositoryUri " ] ,
} == repository_uris
2017-05-23 02:57:14 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-23 02:57:14 +00:00
def test_describe_repositories_2 ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-05-23 02:57:14 +00:00
_ = client . create_repository ( repositoryName = " test_repository1 " )
_ = client . create_repository ( repositoryName = " test_repository0 " )
response = client . describe_repositories ( registryId = " 109876543210 " )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 0
2017-05-23 02:57:14 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-23 02:57:14 +00:00
def test_describe_repositories_3 ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-05-23 02:57:14 +00:00
_ = client . create_repository ( repositoryName = " test_repository1 " )
_ = client . create_repository ( repositoryName = " test_repository0 " )
response = client . describe_repositories ( repositoryNames = [ " test_repository1 " ] )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 1
2021-08-03 15:21:15 +00:00
repository_arn = f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/test_repository1 "
2023-07-17 10:21:32 +00:00
assert response [ " repositories " ] [ 0 ] [ " repositoryArn " ] == repository_arn
2017-05-23 02:57:14 +00:00
2021-08-03 15:21:15 +00:00
repository_uri = f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/test_repository1 "
2023-07-17 10:21:32 +00:00
assert response [ " repositories " ] [ 0 ] [ " repositoryUri " ] == repository_uri
2017-05-23 02:57:14 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-06-20 19:32:32 +00:00
def test_describe_repositories_with_image ( ) :
2021-08-03 15:21:15 +00:00
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2017-06-20 19:32:32 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
)
2021-08-03 15:21:15 +00:00
# when
2023-11-07 18:29:35 +00:00
response = client . describe_repositories ( repositoryNames = [ ECR_REPO ] )
2021-08-03 15:21:15 +00:00
# then
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 1
2021-08-03 15:21:15 +00:00
repo = response [ " repositories " ] [ 0 ]
2023-07-17 10:21:32 +00:00
assert repo [ " registryId " ] == ACCOUNT_ID
assert (
repo [ " repositoryArn " ]
2023-11-07 18:29:35 +00:00
== f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/ { ECR_REPO } "
2021-08-03 15:21:15 +00:00
)
2023-11-07 18:29:35 +00:00
assert repo [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert (
repo [ " repositoryUri " ]
2023-11-07 18:29:35 +00:00
== f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/ { ECR_REPO } "
2021-08-03 15:21:15 +00:00
)
2023-07-17 10:21:32 +00:00
assert isinstance ( repo [ " createdAt " ] , datetime )
assert repo [ " imageScanningConfiguration " ] == { " scanOnPush " : False }
assert repo [ " imageTagMutability " ] == " MUTABLE "
assert repo [ " encryptionConfiguration " ] == { " encryptionType " : " AES256 " }
2017-06-20 19:32:32 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-22 20:52:17 +00:00
def test_delete_repository ( ) :
2021-08-03 15:21:15 +00:00
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
# when
2023-11-07 18:29:35 +00:00
response = client . delete_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
# then
repo = response [ " repository " ]
2023-11-07 18:29:35 +00:00
assert repo [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert (
repo [ " repositoryArn " ]
2023-11-07 18:29:35 +00:00
== f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/ { ECR_REPO } "
2019-10-31 15:44:26 +00:00
)
2023-07-17 10:21:32 +00:00
assert repo [ " registryId " ] == ACCOUNT_ID
assert (
repo [ " repositoryUri " ]
2023-11-07 18:29:35 +00:00
== f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/ { ECR_REPO } "
2017-05-22 20:52:17 +00:00
)
2023-07-17 10:21:32 +00:00
assert isinstance ( repo [ " createdAt " ] , datetime )
assert repo [ " imageTagMutability " ] == " MUTABLE "
2017-05-22 20:52:17 +00:00
response = client . describe_repositories ( )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 0
2017-05-23 03:50:39 +00:00
2017-05-23 02:57:14 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:08:28 +00:00
def test_delete_repository_with_force ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:08:28 +00:00
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-12 05:08:28 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
)
# when
# when
2023-11-07 18:29:35 +00:00
response = client . delete_repository ( repositoryName = ECR_REPO , force = True )
2021-08-12 05:08:28 +00:00
# then
repo = response [ " repository " ]
2023-11-07 18:29:35 +00:00
assert repo [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert (
repo [ " repositoryArn " ]
2023-11-07 18:29:35 +00:00
== f " arn:aws:ecr:us-east-1: { ACCOUNT_ID } :repository/ { ECR_REPO } "
2021-08-12 05:08:28 +00:00
)
2023-07-17 10:21:32 +00:00
assert repo [ " registryId " ] == ACCOUNT_ID
assert (
repo [ " repositoryUri " ]
2023-11-07 18:29:35 +00:00
== f " { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com/ { ECR_REPO } "
2021-08-12 05:08:28 +00:00
)
2023-07-17 10:21:32 +00:00
assert isinstance ( repo [ " createdAt " ] , datetime )
assert repo [ " imageTagMutability " ] == " MUTABLE "
2021-08-12 05:08:28 +00:00
response = client . describe_repositories ( )
2023-07-17 10:21:32 +00:00
assert len ( response [ " repositories " ] ) == 0
2021-08-12 05:08:28 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-22 20:52:17 +00:00
def test_put_image ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-05-22 20:52:17 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
2017-06-20 19:32:32 +00:00
2017-05-22 20:52:17 +00:00
response = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
)
2023-07-17 10:21:32 +00:00
assert response [ " image " ] [ " imageId " ] [ " imageTag " ] == " latest "
assert " sha " in response [ " image " ] [ " imageId " ] [ " imageDigest " ]
assert response [ " image " ] [ " repositoryName " ] == " test_repository "
assert response [ " image " ] [ " registryId " ] == ACCOUNT_ID
2017-05-22 20:52:17 +00:00
2019-06-06 12:34:10 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2023-03-30 07:40:57 +00:00
def test_put_image_without_mediatype ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2023-03-30 07:40:57 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
image_manifest = _create_image_manifest ( )
_ = image_manifest . pop ( " mediaType " )
2023-11-09 23:17:46 +00:00
with pytest . raises ( ClientError ) as exc :
2023-07-17 10:21:32 +00:00
client . put_image (
2023-03-30 07:40:57 +00:00
repositoryName = " test_repository " ,
imageManifest = json . dumps ( image_manifest ) ,
imageTag = " latest " ,
2023-07-17 10:21:32 +00:00
)
2023-11-09 23:17:46 +00:00
err = exc . value . response [ " Error " ]
assert (
err [ " Message " ]
== " image manifest mediatype not provided in manifest or parameter "
)
2023-03-30 07:40:57 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2023-03-30 07:40:57 +00:00
def test_put_image_with_imagemanifestmediatype ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2023-03-30 07:40:57 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
image_manifest = _create_image_manifest ( )
media_type = image_manifest . pop ( " mediaType " )
response = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( image_manifest ) ,
imageManifestMediaType = media_type ,
imageTag = " latest " ,
)
2023-07-17 10:21:32 +00:00
assert response [ " image " ] [ " imageId " ] [ " imageTag " ] == " latest "
assert " sha " in response [ " image " ] [ " imageId " ] [ " imageDigest " ]
assert response [ " image " ] [ " repositoryName " ] == " test_repository "
assert response [ " image " ] [ " imageManifestMediaType " ] == media_type
assert response [ " image " ] [ " registryId " ] == ACCOUNT_ID
2023-03-30 07:40:57 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws ( )
2022-12-16 18:22:43 +00:00
def test_put_manifest_list ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2022-12-16 18:22:43 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
manifest_list = _create_image_manifest_list ( )
for image_manifest in manifest_list [ " image_manifests " ] :
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( image_manifest ) ,
)
response = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest_list [ " manifest_list " ] ) ,
imageTag = " multiArch " ,
)
2023-07-17 10:21:32 +00:00
assert response [ " image " ] [ " imageId " ] [ " imageTag " ] == " multiArch "
assert " sha " in response [ " image " ] [ " imageId " ] [ " imageDigest " ]
assert response [ " image " ] [ " repositoryName " ] == " test_repository "
assert response [ " image " ] [ " registryId " ] == ACCOUNT_ID
assert " imageManifest " in response [ " image " ]
2022-12-16 18:22:43 +00:00
image_manifest = json . loads ( response [ " image " ] [ " imageManifest " ] )
2023-07-17 10:21:32 +00:00
assert " mediaType " in image_manifest
assert " manifests " in image_manifest
2022-12-16 18:22:43 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2019-06-06 12:34:10 +00:00
def test_put_image_with_push_date ( ) :
2023-07-17 10:21:32 +00:00
if settings . TEST_SERVER_MODE :
2019-06-06 12:34:10 +00:00
raise SkipTest ( " Cant manipulate time in server mode " )
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-06-06 12:34:10 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
with freeze_time ( " 2018-08-28 00:00:00 " ) :
2021-10-30 10:00:40 +00:00
image1_date = datetime . now ( tzlocal ( ) )
2019-06-06 12:34:10 +00:00
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
2022-10-15 09:50:42 +00:00
imageTag = " first " ,
2019-06-06 12:34:10 +00:00
)
with freeze_time ( " 2019-05-31 00:00:00 " ) :
2021-10-30 10:00:40 +00:00
image2_date = datetime . now ( tzlocal ( ) )
2019-06-06 12:34:10 +00:00
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
2022-10-15 09:50:42 +00:00
imageTag = " second " ,
2019-06-06 12:34:10 +00:00
)
describe_response = client . describe_images ( repositoryName = " test_repository " )
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response [ " imageDetails " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response [ " imageDetails " ] ) == 2
2019-06-06 12:34:10 +00:00
2023-07-17 10:21:32 +00:00
assert {
describe_response [ " imageDetails " ] [ 0 ] [ " imagePushedAt " ] ,
describe_response [ " imageDetails " ] [ 1 ] [ " imagePushedAt " ] ,
} == { image1_date , image2_date }
2019-06-06 12:34:10 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2018-06-13 14:14:18 +00:00
def test_put_image_with_multiple_tags ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2018-06-13 14:14:18 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
response = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = " v1 " ,
)
2023-07-17 10:21:32 +00:00
assert response [ " image " ] [ " imageId " ] [ " imageTag " ] == " v1 "
assert " sha " in response [ " image " ] [ " imageId " ] [ " imageDigest " ]
assert response [ " image " ] [ " repositoryName " ] == " test_repository "
assert response [ " image " ] [ " registryId " ] == ACCOUNT_ID
2018-06-13 14:14:18 +00:00
response1 = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = " latest " ,
)
2023-07-17 10:21:32 +00:00
assert response1 [ " image " ] [ " imageId " ] [ " imageTag " ] == " latest "
assert " sha " in response1 [ " image " ] [ " imageId " ] [ " imageDigest " ]
assert response1 [ " image " ] [ " repositoryName " ] == " test_repository "
assert response1 [ " image " ] [ " registryId " ] == ACCOUNT_ID
2018-06-13 14:14:18 +00:00
response2 = client . describe_images ( repositoryName = " test_repository " )
2023-09-06 13:08:35 +00:00
assert isinstance ( response2 [ " imageDetails " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response2 [ " imageDetails " ] ) == 1
2018-06-13 14:14:18 +00:00
2023-07-17 10:21:32 +00:00
assert " sha " in response2 [ " imageDetails " ] [ 0 ] [ " imageDigest " ]
2018-06-14 07:07:09 +00:00
2023-07-17 10:21:32 +00:00
assert response2 [ " imageDetails " ] [ 0 ] [ " registryId " ] == ACCOUNT_ID
assert response2 [ " imageDetails " ] [ 0 ] [ " repositoryName " ] == " test_repository "
2018-06-14 07:07:09 +00:00
2023-07-17 10:21:32 +00:00
assert len ( response2 [ " imageDetails " ] [ 0 ] [ " imageTags " ] ) == 2
assert response2 [ " imageDetails " ] [ 0 ] [ " imageTags " ] == [ " v1 " , " latest " ]
2017-05-22 20:52:17 +00:00
2019-06-06 12:34:10 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2022-10-15 09:50:42 +00:00
def test_put_multiple_images_with_same_tag ( ) :
image_tag = " my-tag "
2023-05-29 20:43:17 +00:00
manifest = json . dumps ( _create_image_manifest ( ) )
2022-10-15 09:50:42 +00:00
client = boto3 . client ( " ecr " , " us-east-1 " )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2022-10-15 09:50:42 +00:00
image_1 = client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2022-10-15 09:50:42 +00:00
imageTag = image_tag ,
2023-05-29 20:43:17 +00:00
imageManifest = manifest ,
2022-10-15 09:50:42 +00:00
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
2023-05-29 20:43:17 +00:00
# We should overwrite the first image because the first image
# only has one tag
2022-10-15 09:50:42 +00:00
image_2 = client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2022-10-15 09:50:42 +00:00
imageTag = image_tag ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
assert image_1 != image_2
2023-11-07 18:29:35 +00:00
images = client . describe_images ( repositoryName = ECR_REPO ) [ " imageDetails " ]
2022-10-15 09:50:42 +00:00
2023-07-17 10:21:32 +00:00
assert len ( images ) == 1
assert images [ 0 ] [ " imageDigest " ] == image_2
2022-10-15 09:50:42 +00:00
2023-05-29 20:43:17 +00:00
# Same image with different tags is allowed
2022-10-15 09:50:42 +00:00
image_3 = client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2022-10-15 09:50:42 +00:00
imageTag = " different-tag " ,
2023-05-29 20:43:17 +00:00
imageManifest = manifest ,
2022-10-15 09:50:42 +00:00
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
2023-11-07 18:29:35 +00:00
images = client . describe_images ( repositoryName = ECR_REPO ) [ " imageDetails " ]
2023-07-17 10:21:32 +00:00
assert len ( images ) == 2
assert set ( [ img [ " imageDigest " ] for img in images ] ) == { image_2 , image_3 }
2022-10-15 09:50:42 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2023-05-29 20:43:17 +00:00
def test_put_same_image_with_same_tag ( ) :
image_tag = " my-tag "
manifest = json . dumps ( _create_image_manifest ( ) )
client = boto3 . client ( " ecr " , " us-east-1 " )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2023-05-29 20:43:17 +00:00
image_1 = client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2023-05-29 20:43:17 +00:00
imageTag = image_tag ,
imageManifest = manifest ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
with pytest . raises ( ClientError ) as e :
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2023-05-29 20:43:17 +00:00
imageTag = image_tag ,
imageManifest = manifest ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutImage "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert " ImageAlreadyExistsException " in ex . response [ " Error " ] [ " Code " ]
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " Image with digest ' { image_1 } ' and tag ' { image_tag } ' already exists in the repository with name ' { ECR_REPO } ' in registry with id ' { ACCOUNT_ID } ' "
2023-05-29 20:43:17 +00:00
)
2023-11-07 18:29:35 +00:00
images = client . describe_images ( repositoryName = ECR_REPO ) [ " imageDetails " ]
2023-05-29 20:43:17 +00:00
2023-07-17 10:21:32 +00:00
assert len ( images ) == 1
2023-05-29 20:43:17 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2023-06-15 09:28:23 +00:00
def test_multiple_tags__ensure_tags_exist_only_on_one_image ( ) :
tag_to_move = " mock-tag "
image_manifests = {
" image_001 " : json . dumps ( _create_image_manifest ( ) ) ,
" image_002 " : json . dumps ( _create_image_manifest ( ) ) ,
}
client = boto3 . client ( " ecr " , " us-east-1 " )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2023-06-15 09:28:23 +00:00
# Create image with unique tag
for name , manifest in image_manifests . items ( ) :
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2023-06-15 09:28:23 +00:00
imageTag = name ,
imageManifest = manifest ,
)
# Tag first image with shared tag
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2023-06-15 09:28:23 +00:00
imageTag = tag_to_move ,
imageManifest = image_manifests [ " image_001 " ] ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
# Image can be found
initial_image , * _ = client . batch_get_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2023-06-15 09:28:23 +00:00
imageIds = [ { " imageTag " : tag_to_move } ] ,
) [ " images " ]
assert initial_image [ " imageManifest " ] == image_manifests [ " image_001 " ]
# Tag second image with shared tag
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2023-06-15 09:28:23 +00:00
imageTag = tag_to_move ,
imageManifest = image_manifests [ " image_002 " ] ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
# We should find the second image now - the shared tag should be removed from the first image
new_image , * _ = client . batch_get_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2023-06-15 09:28:23 +00:00
imageIds = [ { " imageTag " : tag_to_move } ] ,
) [ " images " ]
assert new_image [ " imageManifest " ] == image_manifests [ " image_002 " ]
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-22 20:52:17 +00:00
def test_list_images ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-05-23 02:57:14 +00:00
_ = client . create_repository ( repositoryName = " test_repository_1 " )
_ = client . create_repository ( repositoryName = " test_repository_2 " )
2017-05-22 20:52:17 +00:00
_ = client . put_image (
2017-05-23 02:57:14 +00:00
repositoryName = " test_repository_1 " ,
2017-05-22 20:52:17 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
)
_ = client . put_image (
2017-05-23 02:57:14 +00:00
repositoryName = " test_repository_1 " ,
2017-05-22 20:52:17 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v1 " ,
)
_ = client . put_image (
2017-05-23 02:57:14 +00:00
repositoryName = " test_repository_1 " ,
2017-05-22 20:52:17 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v2 " ,
)
2017-05-23 02:57:14 +00:00
_ = client . put_image (
repositoryName = " test_repository_2 " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " oldest " ,
)
response = client . list_images ( repositoryName = " test_repository_1 " )
2023-09-06 13:08:35 +00:00
assert isinstance ( response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageIds " ] ) == 3
2017-05-22 20:52:17 +00:00
2020-11-05 14:10:23 +00:00
for image in response [ " imageIds " ] :
2023-07-17 10:21:32 +00:00
assert " sha " in image [ " imageDigest " ]
2020-11-05 14:10:23 +00:00
2017-05-22 20:52:17 +00:00
image_tags = [ " latest " , " v1 " , " v2 " ]
2023-07-17 10:21:32 +00:00
assert {
response [ " imageIds " ] [ 0 ] [ " imageTag " ] ,
response [ " imageIds " ] [ 1 ] [ " imageTag " ] ,
response [ " imageIds " ] [ 2 ] [ " imageTag " ] ,
} == set ( image_tags )
2017-05-22 20:52:17 +00:00
2017-05-23 02:57:14 +00:00
response = client . list_images ( repositoryName = " test_repository_2 " )
2023-09-06 13:08:35 +00:00
assert isinstance ( response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageIds " ] ) == 1
assert response [ " imageIds " ] [ 0 ] [ " imageTag " ] == " oldest "
assert " sha " in response [ " imageIds " ] [ 0 ] [ " imageDigest " ]
2017-05-23 02:57:14 +00:00
2018-06-25 16:34:10 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2018-06-25 16:34:10 +00:00
def test_list_images_from_repository_that_doesnt_exist ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2018-06-25 16:34:10 +00:00
_ = client . create_repository ( repositoryName = " test_repository_1 " )
# non existing repo
2023-07-17 10:21:32 +00:00
with pytest . raises ( ClientError ) as exc :
client . list_images ( repositoryName = " repo-that-doesnt-exist " , registryId = " 123 " )
err = exc . value . response [ " Error " ]
assert err [ " Code " ] == " RepositoryNotFoundException "
2018-06-25 16:34:10 +00:00
# repo does not exist in specified registry
2023-07-17 10:21:32 +00:00
with pytest . raises ( ClientError ) as exc :
client . list_images ( repositoryName = " test_repository_1 " , registryId = " 222 " )
err = exc . value . response [ " Error " ]
assert err [ " Code " ] == " RepositoryNotFoundException "
2017-05-23 02:57:14 +00:00
2017-05-22 20:52:17 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-05-22 20:52:17 +00:00
def test_describe_images ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-05-22 20:52:17 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
2018-06-13 14:14:18 +00:00
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
)
2017-05-22 20:52:17 +00:00
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v1 " ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v2 " ,
)
2019-10-31 15:44:26 +00:00
2022-12-16 18:22:43 +00:00
manifest_list = _create_image_manifest_list ( )
for image_manifest in manifest_list [ " image_manifests " ] :
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( image_manifest ) ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest_list [ " manifest_list " ] ) ,
imageTag = " multiArch " ,
)
2017-05-22 20:52:17 +00:00
response = client . describe_images ( repositoryName = " test_repository " )
2023-09-06 13:08:35 +00:00
assert isinstance ( response [ " imageDetails " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageDetails " ] ) == 7
2022-12-16 18:22:43 +00:00
2023-07-17 10:21:32 +00:00
for detail in response [ " imageDetails " ] [ 0 : 5 ] :
assert " distribution.manifest.v2+json " in detail [ " imageManifestMediaType " ]
assert " sha " in detail [ " imageDigest " ]
assert detail [ " registryId " ] == ACCOUNT_ID
assert detail [ " repositoryName " ] == " test_repository "
2019-10-31 15:44:26 +00:00
2023-07-17 10:21:32 +00:00
assert " imageTags " not in response [ " imageDetails " ] [ 0 ]
assert " imageTags " not in response [ " imageDetails " ] [ 4 ]
assert " imageTags " not in response [ " imageDetails " ] [ 5 ]
2019-10-31 15:44:26 +00:00
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageDetails " ] [ 1 ] [ " imageTags " ] ) == 1
assert len ( response [ " imageDetails " ] [ 2 ] [ " imageTags " ] ) == 1
assert len ( response [ " imageDetails " ] [ 3 ] [ " imageTags " ] ) == 1
assert len ( response [ " imageDetails " ] [ 6 ] [ " imageTags " ] ) == 1
2019-10-31 15:44:26 +00:00
2017-05-22 20:52:17 +00:00
image_tags = [ " latest " , " v1 " , " v2 " ]
2023-07-17 10:21:32 +00:00
assert {
response [ " imageDetails " ] [ 1 ] [ " imageTags " ] [ 0 ] ,
response [ " imageDetails " ] [ 2 ] [ " imageTags " ] [ 0 ] ,
response [ " imageDetails " ] [ 3 ] [ " imageTags " ] [ 0 ] ,
} == set ( image_tags )
2017-05-22 20:52:17 +00:00
2023-07-17 10:21:32 +00:00
assert " imageSizeInBytes " not in response [ " imageDetails " ] [ 6 ]
2022-12-16 18:22:43 +00:00
2023-07-17 10:21:32 +00:00
assert response [ " imageDetails " ] [ 0 ] [ " imageSizeInBytes " ] > 0
assert response [ " imageDetails " ] [ 1 ] [ " imageSizeInBytes " ] > 0
assert response [ " imageDetails " ] [ 2 ] [ " imageSizeInBytes " ] > 0
assert response [ " imageDetails " ] [ 3 ] [ " imageSizeInBytes " ] > 0
assert response [ " imageDetails " ] [ 4 ] [ " imageSizeInBytes " ] > 0
assert response [ " imageDetails " ] [ 5 ] [ " imageSizeInBytes " ] > 0
2017-06-20 20:22:34 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-06-20 20:22:34 +00:00
def test_describe_images_by_tag ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-06-20 20:22:34 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
tag_map = { }
for tag in [ " latest " , " v1 " , " v2 " ] :
put_response = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = tag ,
)
tag_map [ tag ] = put_response [ " image " ]
for tag , put_response in tag_map . items ( ) :
response = client . describe_images (
repositoryName = " test_repository " , imageIds = [ { " imageTag " : tag } ]
)
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageDetails " ] ) == 1
2017-06-20 20:22:34 +00:00
image_detail = response [ " imageDetails " ] [ 0 ]
2023-07-17 10:21:32 +00:00
assert image_detail [ " registryId " ] == ACCOUNT_ID
assert image_detail [ " repositoryName " ] == " test_repository "
assert image_detail [ " imageTags " ] == [ put_response [ " imageId " ] [ " imageTag " ] ]
assert image_detail [ " imageDigest " ] == put_response [ " imageId " ] [ " imageDigest " ]
2017-06-20 20:22:34 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2018-06-14 07:53:11 +00:00
def test_describe_images_tags_should_not_contain_empty_tag1 ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2018-06-14 07:53:11 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
client . put_image (
repositoryName = " test_repository " , imageManifest = json . dumps ( manifest )
)
tags = [ " v1 " , " v2 " , " latest " ]
for tag in tags :
client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
response = client . describe_images (
repositoryName = " test_repository " , imageIds = [ { " imageTag " : tag } ]
)
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageDetails " ] ) == 1
2018-06-14 07:53:11 +00:00
image_detail = response [ " imageDetails " ] [ 0 ]
2023-07-17 10:21:32 +00:00
assert len ( image_detail [ " imageTags " ] ) == 3
assert image_detail [ " imageTags " ] == tags
2018-06-14 07:53:11 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2018-06-14 07:53:11 +00:00
def test_describe_images_tags_should_not_contain_empty_tag2 ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2018-06-14 07:53:11 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
tags = [ " v1 " , " v2 " ]
for tag in tags :
client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
client . put_image (
repositoryName = " test_repository " , imageManifest = json . dumps ( manifest )
)
client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = " latest " ,
)
response = client . describe_images (
repositoryName = " test_repository " , imageIds = [ { " imageTag " : tag } ]
)
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageDetails " ] ) == 1
2018-06-14 07:53:11 +00:00
image_detail = response [ " imageDetails " ] [ 0 ]
2023-07-17 10:21:32 +00:00
assert len ( image_detail [ " imageTags " ] ) == 3
assert image_detail [ " imageTags " ] == [ " v1 " , " v2 " , " latest " ]
2018-06-14 07:53:11 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-08-10 23:33:38 +00:00
def test_describe_repository_that_doesnt_exist ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-08-10 23:33:38 +00:00
2023-07-17 10:21:32 +00:00
with pytest . raises ( ClientError ) as exc :
client . describe_repositories (
repositoryNames = [ " repo-that-doesnt-exist " ] , registryId = " 123 "
)
err = exc . value . response [ " Error " ]
assert err [ " Code " ] == " RepositoryNotFoundException "
2017-08-10 23:33:38 +00:00
2019-10-31 15:44:26 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-08-10 23:33:38 +00:00
def test_describe_image_that_doesnt_exist ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-08-10 23:33:38 +00:00
client . create_repository ( repositoryName = " test_repository " )
2023-07-17 10:21:32 +00:00
with pytest . raises ( ClientError ) as exc :
client . describe_images (
repositoryName = " test_repository " ,
imageIds = [ { " imageTag " : " testtag " } ] ,
registryId = ACCOUNT_ID ,
)
err = exc . value . response [ " Error " ]
assert err [ " Code " ] == " ImageNotFoundException "
with pytest . raises ( ClientError ) as exc :
client . describe_images (
repositoryName = " repo-that-doesnt-exist " ,
imageIds = [ { " imageTag " : " testtag " } ] ,
registryId = ACCOUNT_ID ,
)
err = exc . value . response [ " Error " ]
assert err [ " Code " ] == " RepositoryNotFoundException "
2017-08-10 23:33:38 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-08-10 23:33:38 +00:00
def test_delete_repository_that_doesnt_exist ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-03 15:21:15 +00:00
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . delete_repository ( repositoryName = ECR_REPO_NOT_EXIST )
2021-08-03 15:21:15 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DeleteRepository "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-03 15:21:15 +00:00
)
2017-08-10 23:33:38 +00:00
2021-08-03 15:21:15 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_delete_repository_error_not_empty ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-03 15:21:15 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
2017-08-10 23:33:38 +00:00
)
2021-08-03 15:21:15 +00:00
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . delete_repository ( repositoryName = ECR_REPO )
2021-08-03 15:21:15 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DeleteRepository "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotEmptyException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO } ' in registry with id ' { ACCOUNT_ID } ' cannot be deleted because it still contains images "
2021-08-03 15:21:15 +00:00
)
2017-08-10 23:33:38 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-06-20 20:22:34 +00:00
def test_describe_images_by_digest ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-06-20 20:22:34 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
tags = [ " latest " , " v1 " , " v2 " ]
digest_map = { }
for tag in tags :
put_response = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = tag ,
)
digest_map [ put_response [ " image " ] [ " imageId " ] [ " imageDigest " ] ] = put_response [
" image "
]
for digest , put_response in digest_map . items ( ) :
response = client . describe_images (
repositoryName = " test_repository " , imageIds = [ { " imageDigest " : digest } ]
)
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageDetails " ] ) == 1
2017-06-20 20:22:34 +00:00
image_detail = response [ " imageDetails " ] [ 0 ]
2023-07-17 10:21:32 +00:00
assert image_detail [ " registryId " ] == ACCOUNT_ID
assert image_detail [ " repositoryName " ] == " test_repository "
assert image_detail [ " imageTags " ] == [ put_response [ " imageId " ] [ " imageTag " ] ]
assert image_detail [ " imageDigest " ] == digest
2017-07-25 21:54:05 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-07-25 21:54:05 +00:00
def test_get_authorization_token_assume_region ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-07-25 21:54:05 +00:00
auth_token_response = client . get_authorization_token ( )
2023-07-17 10:21:32 +00:00
assert " authorizationData " in auth_token_response
assert " ResponseMetadata " in auth_token_response
assert auth_token_response [ " authorizationData " ] == [
{
" authorizationToken " : " QVdTOjEyMzQ1Njc4OTAxMi1hdXRoLXRva2Vu " ,
" proxyEndpoint " : f " https:// { ACCOUNT_ID } .dkr.ecr.us-east-1.amazonaws.com " ,
" expiresAt " : datetime ( 2015 , 1 , 1 , tzinfo = tzlocal ( ) ) ,
}
]
2017-07-25 21:54:05 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-07-25 21:54:05 +00:00
def test_get_authorization_token_explicit_regions ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2018-05-09 07:20:48 +00:00
auth_token_response = client . get_authorization_token (
registryIds = [ " 10987654321 " , " 878787878787 " ]
2017-07-25 21:54:05 +00:00
)
2023-07-17 10:21:32 +00:00
assert " authorizationData " in auth_token_response
assert " ResponseMetadata " in auth_token_response
assert auth_token_response [ " authorizationData " ] == [
{
" authorizationToken " : " QVdTOjEwOTg3NjU0MzIxLWF1dGgtdG9rZW4= " ,
" proxyEndpoint " : " https://10987654321.dkr.ecr.us-east-1.amazonaws.com " ,
" expiresAt " : datetime ( 2015 , 1 , 1 , tzinfo = tzlocal ( ) ) ,
} ,
{
" authorizationToken " : " QVdTOjg3ODc4Nzg3ODc4Ny1hdXRoLXRva2Vu " ,
" proxyEndpoint " : " https://878787878787.dkr.ecr.us-east-1.amazonaws.com " ,
" expiresAt " : datetime ( 2015 , 1 , 1 , tzinfo = tzlocal ( ) ) ,
} ,
]
2017-12-31 02:39:23 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-12-31 02:39:23 +00:00
def test_batch_get_image ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-12-31 02:39:23 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v1 " ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v2 " ,
)
response = client . batch_get_image (
repositoryName = " test_repository " , imageIds = [ { " imageTag " : " v2 " } ]
)
2023-09-06 13:08:35 +00:00
assert isinstance ( response [ " images " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response [ " images " ] ) == 1
2017-12-31 02:39:23 +00:00
2023-07-17 10:21:32 +00:00
assert (
2017-12-31 02:39:23 +00:00
" vnd.docker.distribution.manifest.v2+json "
2023-07-17 10:21:32 +00:00
in response [ " images " ] [ 0 ] [ " imageManifest " ]
2017-12-31 02:39:23 +00:00
)
2023-07-17 10:21:32 +00:00
assert response [ " images " ] [ 0 ] [ " registryId " ] == ACCOUNT_ID
assert response [ " images " ] [ 0 ] [ " repositoryName " ] == " test_repository "
2017-12-31 02:39:23 +00:00
2023-07-17 10:21:32 +00:00
assert response [ " images " ] [ 0 ] [ " imageId " ] [ " imageTag " ] == " v2 "
assert " sha " in response [ " images " ] [ 0 ] [ " imageId " ] [ " imageDigest " ]
2017-12-31 02:39:23 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response [ " failures " ] ) == 0
2017-12-31 02:39:23 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2017-12-31 02:39:23 +00:00
def test_batch_get_image_that_doesnt_exist ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2017-12-31 02:39:23 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v1 " ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v2 " ,
)
response = client . batch_get_image (
repositoryName = " test_repository " , imageIds = [ { " imageTag " : " v5 " } ]
)
2023-09-06 13:08:35 +00:00
assert isinstance ( response [ " images " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response [ " images " ] ) == 0
2017-12-31 02:39:23 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( response [ " failures " ] ) == 1
assert response [ " failures " ] [ 0 ] [ " failureReason " ] == " Requested image not found "
assert response [ " failures " ] [ 0 ] [ " failureCode " ] == " ImageNotFound "
assert response [ " failures " ] [ 0 ] [ " imageId " ] [ " imageTag " ] == " v5 "
2017-12-31 02:39:23 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2023-04-19 16:55:28 +00:00
def test_batch_get_image_with_multiple_tags ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2023-04-19 16:55:28 +00:00
_ = client . create_repository ( repositoryName = " test_repository " )
manifest = json . dumps ( _create_image_manifest ( ) )
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = manifest ,
imageTag = " latest " ,
)
_ = client . put_image (
repositoryName = " test_repository " ,
imageManifest = manifest ,
imageTag = " v1 " ,
)
latest_response = client . batch_get_image (
repositoryName = " test_repository " , imageIds = [ { " imageTag " : " latest " } ]
)
v1_response = client . batch_get_image (
repositoryName = " test_repository " , imageIds = [ { " imageTag " : " v1 " } ]
)
2023-07-17 10:21:32 +00:00
assert (
latest_response [ " images " ] [ 0 ] [ " imageManifest " ]
== v1_response [ " images " ] [ 0 ] [ " imageManifest " ]
2023-04-19 16:55:28 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2019-05-30 17:16:19 +00:00
def test_batch_delete_image_by_tag ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-05-30 17:16:19 +00:00
client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
tags = [ " v1 " , " v1.0 " , " latest " ]
for tag in tags :
2019-06-17 17:41:35 +00:00
client . put_image (
2019-05-30 17:16:19 +00:00
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
describe_response1 = client . describe_images ( repositoryName = " test_repository " )
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " ,
repositoryName = " test_repository " ,
imageIds = [ { " imageTag " : " latest " } ] ,
)
describe_response2 = client . describe_images ( repositoryName = " test_repository " )
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response1 [ " imageDetails " ] [ 0 ] [ " imageTags " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response1 [ " imageDetails " ] [ 0 ] [ " imageTags " ] ) == 3
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response2 [ " imageDetails " ] [ 0 ] [ " imageTags " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response2 [ " imageDetails " ] [ 0 ] [ " imageTags " ] ) == 2
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 1
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " imageIds " ] [ 0 ] [ " imageTag " ] == " latest "
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 0
2019-05-30 17:16:19 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2019-06-17 17:41:35 +00:00
def test_batch_delete_image_delete_last_tag ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-06-17 17:41:35 +00:00
client . create_repository ( repositoryName = " test_repository " )
client . put_image (
repositoryName = " test_repository " ,
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " v1 " ,
)
describe_response1 = client . describe_images ( repositoryName = " test_repository " )
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " ,
repositoryName = " test_repository " ,
imageIds = [ { " imageTag " : " v1 " } ] ,
)
describe_response2 = client . describe_images ( repositoryName = " test_repository " )
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response1 [ " imageDetails " ] [ 0 ] [ " imageTags " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response1 [ " imageDetails " ] [ 0 ] [ " imageTags " ] ) == 1
2019-06-17 17:41:35 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response2 [ " imageDetails " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response2 [ " imageDetails " ] ) == 0
2019-06-17 17:41:35 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 1
2019-06-17 17:41:35 +00:00
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " imageIds " ] [ 0 ] [ " imageTag " ] == " v1 "
2019-06-17 17:41:35 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 0
2019-06-17 17:41:35 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2019-05-30 17:16:19 +00:00
def test_batch_delete_image_with_nonexistent_tag ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-05-30 17:16:19 +00:00
client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
tags = [ " v1 " , " v1.0 " , " latest " ]
for tag in tags :
2019-06-17 17:41:35 +00:00
client . put_image (
2019-05-30 17:16:19 +00:00
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
describe_response = client . describe_images ( repositoryName = " test_repository " )
missing_tag = " missing-tag "
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " ,
repositoryName = " test_repository " ,
imageIds = [ { " imageTag " : missing_tag } ] ,
)
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response [ " imageDetails " ] [ 0 ] [ " imageTags " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response [ " imageDetails " ] [ 0 ] [ " imageTags " ] ) == 3
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 0
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " failures " ] [ 0 ] [ " imageId " ] [ " imageTag " ] == missing_tag
assert batch_delete_response [ " failures " ] [ 0 ] [ " failureCode " ] == " ImageNotFound "
assert (
batch_delete_response [ " failures " ] [ 0 ] [ " failureReason " ]
== " Requested image not found "
2019-05-30 17:16:19 +00:00
)
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 1
2019-05-30 17:16:19 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2019-05-30 17:16:19 +00:00
def test_batch_delete_image_by_digest ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-05-30 17:16:19 +00:00
client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
tags = [ " v1 " , " v2 " , " latest " ]
for tag in tags :
2019-06-17 17:41:35 +00:00
client . put_image (
2019-05-30 17:16:19 +00:00
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
describe_response = client . describe_images ( repositoryName = " test_repository " )
image_digest = describe_response [ " imageDetails " ] [ 0 ] [ " imageDigest " ]
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " ,
repositoryName = " test_repository " ,
imageIds = [ { " imageDigest " : image_digest } ] ,
)
describe_response = client . describe_images ( repositoryName = " test_repository " )
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response [ " imageDetails " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response [ " imageDetails " ] ) == 0
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 3
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " imageIds " ] [ 0 ] [ " imageDigest " ] == image_digest
assert batch_delete_response [ " imageIds " ] [ 1 ] [ " imageDigest " ] == image_digest
assert batch_delete_response [ " imageIds " ] [ 2 ] [ " imageDigest " ] == image_digest
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert {
batch_delete_response [ " imageIds " ] [ 0 ] [ " imageTag " ] ,
batch_delete_response [ " imageIds " ] [ 1 ] [ " imageTag " ] ,
batch_delete_response [ " imageIds " ] [ 2 ] [ " imageTag " ] ,
} == set ( tags )
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 0
2019-05-30 17:16:19 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2019-05-30 17:16:19 +00:00
def test_batch_delete_image_with_invalid_digest ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-05-30 17:16:19 +00:00
client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
tags = [ " v1 " , " v2 " , " latest " ]
for tag in tags :
2019-06-17 17:41:35 +00:00
client . put_image (
2019-05-30 17:16:19 +00:00
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
invalid_image_digest = " sha256:invalid-digest "
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " ,
repositoryName = " test_repository " ,
imageIds = [ { " imageDigest " : invalid_image_digest } ] ,
)
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 0
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 1
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert (
batch_delete_response [ " failures " ] [ 0 ] [ " imageId " ] [ " imageDigest " ]
== invalid_image_digest
2019-05-30 17:16:19 +00:00
)
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " failures " ] [ 0 ] [ " failureCode " ] == " InvalidImageDigest "
assert (
batch_delete_response [ " failures " ] [ 0 ] [ " failureReason " ]
== " Invalid request parameters: image digest should satisfy the regex ' [a-zA-Z0-9-_+.]+:[a-fA-F0-9]+ ' "
2019-05-30 17:16:19 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2019-05-30 17:16:19 +00:00
def test_batch_delete_image_with_missing_parameters ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-05-30 17:16:19 +00:00
client . create_repository ( repositoryName = " test_repository " )
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " , repositoryName = " test_repository " , imageIds = [ { } ]
)
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 0
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 1
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " failures " ] [ 0 ] [ " failureCode " ] == " MissingDigestAndTag "
assert (
batch_delete_response [ " failures " ] [ 0 ] [ " failureReason " ]
== " Invalid request parameters: both tag and digest cannot be null "
2019-05-30 17:16:19 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2019-05-30 17:16:19 +00:00
def test_batch_delete_image_with_matching_digest_and_tag ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-05-30 17:16:19 +00:00
client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
tags = [ " v1 " , " v1.0 " , " latest " ]
for tag in tags :
2019-06-17 17:41:35 +00:00
client . put_image (
2019-05-30 17:16:19 +00:00
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
describe_response = client . describe_images ( repositoryName = " test_repository " )
image_digest = describe_response [ " imageDetails " ] [ 0 ] [ " imageDigest " ]
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " ,
repositoryName = " test_repository " ,
imageIds = [ { " imageDigest " : image_digest , " imageTag " : " v1 " } ] ,
)
describe_response = client . describe_images ( repositoryName = " test_repository " )
2023-09-06 13:08:35 +00:00
assert isinstance ( describe_response [ " imageDetails " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( describe_response [ " imageDetails " ] ) == 0
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 3
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " imageIds " ] [ 0 ] [ " imageDigest " ] == image_digest
assert batch_delete_response [ " imageIds " ] [ 1 ] [ " imageDigest " ] == image_digest
assert batch_delete_response [ " imageIds " ] [ 2 ] [ " imageDigest " ] == image_digest
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert {
batch_delete_response [ " imageIds " ] [ 0 ] [ " imageTag " ] ,
batch_delete_response [ " imageIds " ] [ 1 ] [ " imageTag " ] ,
batch_delete_response [ " imageIds " ] [ 2 ] [ " imageTag " ] ,
} == set ( tags )
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 0
2019-05-30 17:16:19 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2019-05-30 17:16:19 +00:00
def test_batch_delete_image_with_mismatched_digest_and_tag ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2019-05-30 17:16:19 +00:00
client . create_repository ( repositoryName = " test_repository " )
manifest = _create_image_manifest ( )
tags = [ " v1 " , " latest " ]
for tag in tags :
2019-06-17 17:41:35 +00:00
client . put_image (
2019-05-30 17:16:19 +00:00
repositoryName = " test_repository " ,
imageManifest = json . dumps ( manifest ) ,
imageTag = tag ,
)
describe_response = client . describe_images ( repositoryName = " test_repository " )
image_digest = describe_response [ " imageDetails " ] [ 0 ] [ " imageDigest " ]
batch_delete_response = client . batch_delete_image (
registryId = " 012345678910 " ,
repositoryName = " test_repository " ,
imageIds = [ { " imageDigest " : image_digest , " imageTag " : " v2 " } ] ,
)
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " imageIds " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " imageIds " ] ) == 0
2019-05-30 17:16:19 +00:00
2023-09-06 13:08:35 +00:00
assert isinstance ( batch_delete_response [ " failures " ] , list )
2023-07-17 10:21:32 +00:00
assert len ( batch_delete_response [ " failures " ] ) == 1
2019-05-30 17:16:19 +00:00
2023-07-17 10:21:32 +00:00
assert (
batch_delete_response [ " failures " ] [ 0 ] [ " imageId " ] [ " imageDigest " ] == image_digest
2019-05-30 17:16:19 +00:00
)
2023-07-17 10:21:32 +00:00
assert batch_delete_response [ " failures " ] [ 0 ] [ " imageId " ] [ " imageTag " ] == " v2 "
assert batch_delete_response [ " failures " ] [ 0 ] [ " failureCode " ] == " ImageNotFound "
assert (
batch_delete_response [ " failures " ] [ 0 ] [ " failureReason " ]
== " Requested image not found "
2019-05-30 17:16:19 +00:00
)
2021-08-03 15:21:15 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2022-03-25 15:11:54 +00:00
def test_delete_batch_image_with_multiple_images ( ) :
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2022-03-25 15:11:54 +00:00
# Populate mock repo with images
for i in range ( 10 ) :
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2022-11-23 13:16:33 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = f " tag { i } " ,
2022-03-25 15:11:54 +00:00
)
# Pull down image digests for each image in the mock repo
2023-11-07 18:29:35 +00:00
repo_images = client . describe_images ( repositoryName = ECR_REPO ) [ " imageDetails " ]
2022-03-25 15:11:54 +00:00
image_digests = [ { " imageDigest " : image [ " imageDigest " ] } for image in repo_images ]
# Pick a couple of images to delete
images_to_delete = image_digests [ 5 : 7 ]
# Delete the images
response = client . batch_delete_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageIds = images_to_delete
2022-03-25 15:11:54 +00:00
)
2023-07-17 10:21:32 +00:00
assert len ( response [ " imageIds " ] ) == 2
assert response [ " failures " ] == [ ]
2022-03-25 15:11:54 +00:00
# Verify other images still exist
2023-11-07 18:29:35 +00:00
repo_images = client . describe_images ( repositoryName = ECR_REPO ) [ " imageDetails " ]
2022-03-25 15:11:54 +00:00
image_tags = [ img [ " imageTags " ] [ 0 ] for img in repo_images ]
2023-07-17 10:21:32 +00:00
assert image_tags == [
" tag0 " ,
" tag1 " ,
" tag2 " ,
" tag3 " ,
" tag4 " ,
" tag7 " ,
" tag8 " ,
" tag9 " ,
]
2022-03-25 15:11:54 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_list_tags_for_resource ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-03 15:21:15 +00:00
arn = client . create_repository (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , tags = [ { " Key " : " key-1 " , " Value " : " value-1 " } ]
2021-08-03 15:21:15 +00:00
) [ " repository " ] [ " repositoryArn " ]
# when
tags = client . list_tags_for_resource ( resourceArn = arn ) [ " tags " ]
# then
2023-07-17 10:21:32 +00:00
assert tags == [ { " Key " : " key-1 " , " Value " : " value-1 " } ]
2021-08-03 15:21:15 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_list_tags_for_resource_error_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . list_tags_for_resource (
2023-11-07 18:29:35 +00:00
resourceArn = f " arn:aws:ecr: { region_name } : { ACCOUNT_ID } :repository/ { ECR_REPO } "
2021-08-03 15:21:15 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " ListTagsForResource "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-03 15:21:15 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-05 13:23:11 +00:00
def test_list_tags_for_resource_error_invalid_param ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . list_tags_for_resource ( resourceArn = " invalid " )
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " ListTagsForResource "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " InvalidParameterException "
assert (
ex . response [ " Error " ] [ " Message " ]
== " Invalid parameter at ' resourceArn ' failed to satisfy constraint: ' Invalid ARN ' "
2021-08-05 13:23:11 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_tag_resource ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-03 15:21:15 +00:00
arn = client . create_repository (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , tags = [ { " Key " : " key-1 " , " Value " : " value-1 " } ]
2021-08-03 15:21:15 +00:00
) [ " repository " ] [ " repositoryArn " ]
# when
client . tag_resource ( resourceArn = arn , tags = [ { " Key " : " key-2 " , " Value " : " value-2 " } ] )
# then
tags = client . list_tags_for_resource ( resourceArn = arn ) [ " tags " ]
2023-07-17 10:21:32 +00:00
assert sorted ( tags , key = lambda i : i [ " Key " ] ) == [
{ " Key " : " key-1 " , " Value " : " value-1 " } ,
{ " Key " : " key-2 " , " Value " : " value-2 " } ,
]
2021-08-03 15:21:15 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_tag_resource_error_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . tag_resource (
2023-11-07 18:29:35 +00:00
resourceArn = f " arn:aws:ecr: { region_name } : { ACCOUNT_ID } :repository/ { ECR_REPO } " ,
2021-08-03 15:21:15 +00:00
tags = [ { " Key " : " key-1 " , " Value " : " value-2 " } ] ,
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " TagResource "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-03 15:21:15 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_untag_resource ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-03 15:21:15 +00:00
arn = client . create_repository (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-03 15:21:15 +00:00
tags = [
{ " Key " : " key-1 " , " Value " : " value-1 " } ,
{ " Key " : " key-2 " , " Value " : " value-2 " } ,
] ,
) [ " repository " ] [ " repositoryArn " ]
# when
client . untag_resource ( resourceArn = arn , tagKeys = [ " key-1 " ] )
# then
tags = client . list_tags_for_resource ( resourceArn = arn ) [ " tags " ]
2023-07-17 10:21:32 +00:00
assert tags == [ { " Key " : " key-2 " , " Value " : " value-2 " } ]
2021-08-03 15:21:15 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-03 15:21:15 +00:00
def test_untag_resource_error_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . untag_resource (
2023-11-07 18:29:35 +00:00
resourceArn = f " arn:aws:ecr: { region_name } : { ACCOUNT_ID } :repository/ { ECR_REPO } " ,
2021-08-03 15:21:15 +00:00
tagKeys = [ " key-1 " ] ,
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " UntagResource "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-03 15:21:15 +00:00
)
2021-08-05 13:23:11 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-05 13:23:11 +00:00
def test_put_image_tag_mutability ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-05 13:23:11 +00:00
2023-11-07 18:29:35 +00:00
response = client . describe_repositories ( repositoryNames = [ ECR_REPO ] )
2023-07-17 10:21:32 +00:00
assert response [ " repositories " ] [ 0 ] [ " imageTagMutability " ] == " MUTABLE "
2021-08-05 13:23:11 +00:00
# when
response = client . put_image_tag_mutability (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageTagMutability = " IMMUTABLE "
2021-08-05 13:23:11 +00:00
)
# then
2023-07-17 10:21:32 +00:00
assert response [ " imageTagMutability " ] == " IMMUTABLE "
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2021-08-05 13:23:11 +00:00
2023-11-07 18:29:35 +00:00
response = client . describe_repositories ( repositoryNames = [ ECR_REPO ] )
2023-07-17 10:21:32 +00:00
assert response [ " repositories " ] [ 0 ] [ " imageTagMutability " ] == " IMMUTABLE "
2021-08-05 13:23:11 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-05 13:23:11 +00:00
def test_put_image_tag_mutability_error_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . put_image_tag_mutability (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO_NOT_EXIST , imageTagMutability = " IMMUTABLE "
2021-08-05 13:23:11 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutImageTagMutability "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-05 13:23:11 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-05 13:23:11 +00:00
def test_put_image_tag_mutability_error_invalid_param ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2021-08-05 13:23:11 +00:00
# when
with pytest . raises ( ClientError ) as e :
client . put_image_tag_mutability (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageTagMutability = " invalid "
2021-08-05 13:23:11 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutImageTagMutability "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " InvalidParameterException "
assert (
ex . response [ " Error " ] [ " Message " ]
== " Invalid parameter at ' imageTagMutability ' failed to satisfy constraint: ' Member must satisfy enum value set: [IMMUTABLE, MUTABLE] ' "
2021-08-05 13:23:11 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-05 13:23:11 +00:00
def test_put_image_scanning_configuration ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-05 13:23:11 +00:00
2023-11-07 18:29:35 +00:00
response = client . describe_repositories ( repositoryNames = [ ECR_REPO ] )
2023-07-17 10:21:32 +00:00
assert response [ " repositories " ] [ 0 ] [ " imageScanningConfiguration " ] == {
" scanOnPush " : False
}
2021-08-05 13:23:11 +00:00
# when
response = client . put_image_scanning_configuration (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageScanningConfiguration = { " scanOnPush " : True }
2021-08-05 13:23:11 +00:00
)
# then
2023-07-17 10:21:32 +00:00
assert response [ " imageScanningConfiguration " ] == { " scanOnPush " : True }
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2021-08-05 13:23:11 +00:00
2023-11-07 18:29:35 +00:00
response = client . describe_repositories ( repositoryNames = [ ECR_REPO ] )
2023-07-17 10:21:32 +00:00
assert response [ " repositories " ] [ 0 ] [ " imageScanningConfiguration " ] == {
" scanOnPush " : True
}
2021-08-05 13:23:11 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-05 13:23:11 +00:00
def test_put_image_scanning_configuration_error_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . put_image_scanning_configuration (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO_NOT_EXIST ,
imageScanningConfiguration = { " scanOnPush " : True } ,
2021-08-05 13:23:11 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutImageScanningConfiguration "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-05 13:23:11 +00:00
)
2021-08-07 07:48:28 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_set_repository_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Sid " : " root " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : f " arn:aws:iam:: { ACCOUNT_ID } :root " } ,
" Action " : [ " ecr:DescribeImages " ] ,
}
] ,
}
# when
response = client . set_repository_policy (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , policyText = json . dumps ( policy )
2021-08-07 07:48:28 +00:00
)
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert json . loads ( response [ " policyText " ] ) == policy
2021-08-07 07:48:28 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_set_repository_policy_error_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Sid " : " root " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : f " arn:aws:iam:: { ACCOUNT_ID } :root " } ,
" Action " : [ " ecr:DescribeImages " ] ,
}
] ,
}
# when
with pytest . raises ( ClientError ) as e :
client . set_repository_policy (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO_NOT_EXIST , policyText = json . dumps ( policy )
2021-08-07 07:48:28 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " SetRepositoryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-07 07:48:28 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_set_repository_policy_error_invalid_param ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [ { " Effect " : " Allow " } ] ,
}
# when
with pytest . raises ( ClientError ) as e :
client . set_repository_policy (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , policyText = json . dumps ( policy )
2021-08-07 07:48:28 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " SetRepositoryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " InvalidParameterException "
assert (
ex . response [ " Error " ] [ " Message " ]
== " Invalid parameter at ' PolicyText ' failed to satisfy constraint: ' Invalid repository policy provided ' "
2021-08-07 07:48:28 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_get_repository_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Sid " : " root " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : f " arn:aws:iam:: { ACCOUNT_ID } :root " } ,
" Action " : [ " ecr:DescribeImages " ] ,
}
] ,
}
2023-11-07 18:29:35 +00:00
client . set_repository_policy ( repositoryName = ECR_REPO , policyText = json . dumps ( policy ) )
2021-08-07 07:48:28 +00:00
# when
2023-11-07 18:29:35 +00:00
response = client . get_repository_policy ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert json . loads ( response [ " policyText " ] ) == policy
2021-08-07 07:48:28 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_get_repository_policy_error_repo_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . get_repository_policy ( repositoryName = ECR_REPO_NOT_EXIST )
2021-08-07 07:48:28 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " GetRepositoryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-07 07:48:28 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_get_repository_policy_error_policy_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . get_repository_policy ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " GetRepositoryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryPolicyNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " Repository policy does not exist for the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-07 07:48:28 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_delete_repository_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Sid " : " root " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : f " arn:aws:iam:: { ACCOUNT_ID } :root " } ,
" Action " : [ " ecr:DescribeImages " ] ,
}
] ,
}
2023-11-07 18:29:35 +00:00
client . set_repository_policy ( repositoryName = ECR_REPO , policyText = json . dumps ( policy ) )
2021-08-07 07:48:28 +00:00
# when
2023-11-07 18:29:35 +00:00
response = client . delete_repository_policy ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert json . loads ( response [ " policyText " ] ) == policy
2021-08-07 07:48:28 +00:00
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . get_repository_policy ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
2023-07-17 10:21:32 +00:00
assert e . value . response [ " Error " ] [ " Code " ] == " RepositoryPolicyNotFoundException "
2021-08-07 07:48:28 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_delete_repository_policy_error_repo_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . delete_repository_policy ( repositoryName = ECR_REPO_NOT_EXIST )
2021-08-07 07:48:28 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DeleteRepositoryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-07 07:48:28 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-07 07:48:28 +00:00
def test_delete_repository_policy_error_policy_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . delete_repository_policy ( repositoryName = ECR_REPO )
2021-08-07 07:48:28 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DeleteRepositoryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryPolicyNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " Repository policy does not exist for the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-07 07:48:28 +00:00
)
2021-08-09 13:55:29 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_put_lifecycle_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
policy = {
" rules " : [
{
" rulePriority " : 1 ,
" description " : " test policy " ,
" selection " : {
" tagStatus " : " untagged " ,
" countType " : " imageCountMoreThan " ,
" countNumber " : 30 ,
} ,
" action " : { " type " : " expire " } ,
}
]
}
# when
response = client . put_lifecycle_policy (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , lifecyclePolicyText = json . dumps ( policy )
2021-08-09 13:55:29 +00:00
)
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert json . loads ( response [ " lifecyclePolicyText " ] ) == policy
2021-08-09 13:55:29 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_put_lifecycle_policy_error_repo_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
policy = {
" rules " : [
{
" rulePriority " : 1 ,
" description " : " test policy " ,
" selection " : {
" tagStatus " : " untagged " ,
" countType " : " imageCountMoreThan " ,
" countNumber " : 30 ,
} ,
" action " : { " type " : " expire " } ,
}
]
}
# when
with pytest . raises ( ClientError ) as e :
client . put_lifecycle_policy (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO_NOT_EXIST , lifecyclePolicyText = json . dumps ( policy )
2021-08-09 13:55:29 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutLifecyclePolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-09 13:55:29 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_get_lifecycle_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
policy = {
" rules " : [
{
" rulePriority " : 1 ,
" description " : " test policy " ,
" selection " : {
" tagStatus " : " untagged " ,
" countType " : " imageCountMoreThan " ,
" countNumber " : 30 ,
} ,
" action " : { " type " : " expire " } ,
}
]
}
client . put_lifecycle_policy (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , lifecyclePolicyText = json . dumps ( policy )
2021-08-09 13:55:29 +00:00
)
# when
2023-11-07 18:29:35 +00:00
response = client . get_lifecycle_policy ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert json . loads ( response [ " lifecyclePolicyText " ] ) == policy
assert isinstance ( response [ " lastEvaluatedAt " ] , datetime )
2021-08-09 13:55:29 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_get_lifecycle_policy_error_repo_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . get_lifecycle_policy ( repositoryName = ECR_REPO_NOT_EXIST )
2021-08-09 13:55:29 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " GetLifecyclePolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-09 13:55:29 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_get_lifecycle_policy_error_policy_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . get_lifecycle_policy ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " GetLifecyclePolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " LifecyclePolicyNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " Lifecycle policy does not exist for the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-09 13:55:29 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_delete_lifecycle_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
policy = {
" rules " : [
{
" rulePriority " : 1 ,
" description " : " test policy " ,
" selection " : {
" tagStatus " : " untagged " ,
" countType " : " imageCountMoreThan " ,
" countNumber " : 30 ,
} ,
" action " : { " type " : " expire " } ,
}
]
}
client . put_lifecycle_policy (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , lifecyclePolicyText = json . dumps ( policy )
2021-08-09 13:55:29 +00:00
)
# when
2023-11-07 18:29:35 +00:00
response = client . delete_lifecycle_policy ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert json . loads ( response [ " lifecyclePolicyText " ] ) == policy
assert isinstance ( response [ " lastEvaluatedAt " ] , datetime )
2021-08-09 13:55:29 +00:00
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . get_lifecycle_policy ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
2023-07-17 10:21:32 +00:00
assert e . value . response [ " Error " ] [ " Code " ] == " LifecyclePolicyNotFoundException "
2021-08-09 13:55:29 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_delete_lifecycle_policy_error_repo_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . delete_lifecycle_policy ( repositoryName = ECR_REPO_NOT_EXIST )
2021-08-09 13:55:29 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DeleteLifecyclePolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-09 13:55:29 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-09 13:55:29 +00:00
def test_delete_lifecycle_policy_error_policy_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
2023-11-07 18:29:35 +00:00
client . create_repository ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
# when
with pytest . raises ( ClientError ) as e :
2023-11-07 18:29:35 +00:00
client . delete_lifecycle_policy ( repositoryName = ECR_REPO )
2021-08-09 13:55:29 +00:00
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DeleteLifecyclePolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " LifecyclePolicyNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " Lifecycle policy does not exist for the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-09 13:55:29 +00:00
)
2021-08-11 12:18:12 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2022-05-19 11:08:02 +00:00
@pytest.mark.parametrize (
" actions " ,
[ " ecr:CreateRepository " , [ " ecr:CreateRepository " , " ecr:ReplicateImage " ] ] ,
ids = [ " single-action " , " multiple-actions " ] ,
)
def test_put_registry_policy ( actions ) :
2021-08-11 12:18:12 +00:00
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-11 12:18:12 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Effect " : " Allow " ,
" Principal " : {
" AWS " : [ " arn:aws:iam::111111111111:root " , " 222222222222 " ]
} ,
2022-05-19 11:08:02 +00:00
" Action " : actions ,
2021-08-11 12:18:12 +00:00
" Resource " : " * " ,
}
] ,
}
# when
response = client . put_registry_policy ( policyText = json . dumps ( policy ) )
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
assert json . loads ( response [ " policyText " ] ) == policy
2021-08-11 12:18:12 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-11 12:18:12 +00:00
def test_put_registry_policy_error_invalid_action ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-11 12:18:12 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Effect " : " Allow " ,
" Principal " : { " AWS " : " arn:aws:iam::111111111111:root " } ,
" Action " : [
" ecr:CreateRepository " ,
" ecr:ReplicateImage " ,
" ecr:DescribeRepositories " ,
] ,
" Resource " : " * " ,
}
] ,
}
# when
with pytest . raises ( ClientError ) as e :
client . put_registry_policy ( policyText = json . dumps ( policy ) )
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutRegistryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " InvalidParameterException "
assert (
ex . response [ " Error " ] [ " Message " ]
== " Invalid parameter at ' PolicyText ' failed to satisfy constraint: ' Invalid registry policy provided ' "
2021-08-11 12:18:12 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-11 12:18:12 +00:00
def test_get_registry_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-11 12:18:12 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Effect " : " Allow " ,
" Principal " : {
" AWS " : [ " arn:aws:iam::111111111111:root " , " 222222222222 " ]
} ,
" Action " : [ " ecr:CreateRepository " , " ecr:ReplicateImage " ] ,
" Resource " : " * " ,
}
] ,
}
client . put_registry_policy ( policyText = json . dumps ( policy ) )
# when
response = client . get_registry_policy ( )
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
assert json . loads ( response [ " policyText " ] ) == policy
2021-08-11 12:18:12 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-11 12:18:12 +00:00
def test_get_registry_policy_error_policy_not_exists ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-11 12:18:12 +00:00
# when
with pytest . raises ( ClientError ) as e :
client . get_registry_policy ( )
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " GetRegistryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RegistryPolicyNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
== f " Registry policy does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-11 12:18:12 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-11 12:18:12 +00:00
def test_delete_registry_policy ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-11 12:18:12 +00:00
policy = {
" Version " : " 2012-10-17 " ,
" Statement " : [
{
" Effect " : " Allow " ,
" Principal " : {
" AWS " : [ " arn:aws:iam::111111111111:root " , " 222222222222 " ]
} ,
" Action " : [ " ecr:CreateRepository " , " ecr:ReplicateImage " ] ,
" Resource " : " * " ,
}
] ,
}
client . put_registry_policy ( policyText = json . dumps ( policy ) )
# when
response = client . delete_registry_policy ( )
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
assert json . loads ( response [ " policyText " ] ) == policy
2021-08-11 12:18:12 +00:00
with pytest . raises ( ClientError ) as e :
client . get_registry_policy ( )
2023-07-17 10:21:32 +00:00
assert e . value . response [ " Error " ] [ " Code " ] == " RegistryPolicyNotFoundException "
2021-08-11 12:18:12 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-11 12:18:12 +00:00
def test_delete_registry_policy_error_policy_not_exists ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-11 12:18:12 +00:00
# when
with pytest . raises ( ClientError ) as e :
client . delete_registry_policy ( )
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DeleteRegistryPolicy "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RegistryPolicyNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
== f " Registry policy does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-11 12:18:12 +00:00
)
2021-08-12 05:06:21 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_start_image_scan ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:06:21 +00:00
image_tag = " latest "
image_digest = client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-12 05:06:21 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
# when
response = client . start_image_scan (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag }
2021-08-12 05:06:21 +00:00
)
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert response [ " imageId " ] == { " imageDigest " : image_digest , " imageTag " : image_tag }
assert response [ " imageScanStatus " ] == { " status " : " IN_PROGRESS " }
2021-08-12 05:06:21 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_start_image_scan_error_repo_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . start_image_scan (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO_NOT_EXIST , imageId = { " imageTag " : " latest " }
2021-08-12 05:06:21 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " StartImageScan "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-12 05:06:21 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_start_image_scan_error_image_not_exists ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:06:21 +00:00
image_tag = " not-exists "
# when
with pytest . raises ( ClientError ) as e :
client . start_image_scan (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag }
2021-08-12 05:06:21 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " StartImageScan "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " ImageNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The image with imageId {{ imageDigest: ' null ' , imageTag: ' { image_tag } ' }} does not exist within the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-12 05:06:21 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_start_image_scan_error_image_tag_digest_mismatch ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:06:21 +00:00
image_digest = client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-12 05:06:21 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
image_tag = " not-latest "
# when
with pytest . raises ( ClientError ) as e :
client . start_image_scan (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-12 05:06:21 +00:00
imageId = { " imageTag " : image_tag , " imageDigest " : image_digest } ,
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " StartImageScan "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " ImageNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The image with imageId {{ imageDigest: ' { image_digest } ' , imageTag: ' { image_tag } ' }} does not exist within the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-12 05:06:21 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_start_image_scan_error_daily_limit ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:06:21 +00:00
image_tag = " latest "
2021-10-18 19:44:29 +00:00
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-12 05:06:21 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
2021-10-18 19:44:29 +00:00
)
2023-11-07 18:29:35 +00:00
client . start_image_scan ( repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag } )
2021-08-12 05:06:21 +00:00
# when
with pytest . raises ( ClientError ) as e :
client . start_image_scan (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag }
2021-08-12 05:06:21 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " StartImageScan "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " LimitExceededException "
assert (
ex . response [ " Error " ] [ " Message " ]
== " The scan quota per image has been exceeded. Wait and try again. "
2021-08-12 05:06:21 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_describe_image_scan_findings ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:06:21 +00:00
image_tag = " latest "
image_digest = client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-12 05:06:21 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = " latest " ,
) [ " image " ] [ " imageId " ] [ " imageDigest " ]
2023-11-07 18:29:35 +00:00
client . start_image_scan ( repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag } )
2021-08-12 05:06:21 +00:00
# when
response = client . describe_image_scan_findings (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag }
2021-08-12 05:06:21 +00:00
)
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
2023-11-07 18:29:35 +00:00
assert response [ " repositoryName " ] == ECR_REPO
2023-07-17 10:21:32 +00:00
assert response [ " imageId " ] == { " imageDigest " : image_digest , " imageTag " : image_tag }
assert response [ " imageScanStatus " ] == {
" status " : " COMPLETE " ,
" description " : " The scan was completed successfully. " ,
}
2021-08-12 05:06:21 +00:00
scan_findings = response [ " imageScanFindings " ]
2023-07-17 10:21:32 +00:00
assert isinstance ( scan_findings [ " imageScanCompletedAt " ] , datetime )
assert isinstance ( scan_findings [ " vulnerabilitySourceUpdatedAt " ] , datetime )
assert scan_findings [ " findings " ] == [
{
" name " : " CVE-9999-9999 " ,
" uri " : " https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-9999-9999 " ,
" severity " : " HIGH " ,
" attributes " : [
{ " key " : " package_version " , " value " : " 9.9.9 " } ,
{ " key " : " package_name " , " value " : " moto_fake " } ,
{ " key " : " CVSS2_VECTOR " , " value " : " AV:N/AC:L/Au:N/C:P/I:P/A:P " } ,
{ " key " : " CVSS2_SCORE " , " value " : " 7.5 " } ,
] ,
}
]
assert scan_findings [ " findingSeverityCounts " ] == { " HIGH " : 1 }
2021-08-12 05:06:21 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_describe_image_scan_findings_error_repo_not_exists ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
# when
with pytest . raises ( ClientError ) as e :
client . describe_image_scan_findings (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO_NOT_EXIST , imageId = { " imageTag " : " latest " }
2021-08-12 05:06:21 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DescribeImageScanFindings "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " RepositoryNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The repository with name ' { ECR_REPO_NOT_EXIST } ' does not exist in the registry with id ' { ACCOUNT_ID } ' "
2021-08-12 05:06:21 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_describe_image_scan_findings_error_image_not_exists ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:06:21 +00:00
image_tag = " not-exists "
# when
with pytest . raises ( ClientError ) as e :
client . describe_image_scan_findings (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag }
2021-08-12 05:06:21 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DescribeImageScanFindings "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " ImageNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " The image with imageId {{ imageDigest: ' null ' , imageTag: ' { image_tag } ' }} does not exist within the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-12 05:06:21 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-12 05:06:21 +00:00
def test_describe_image_scan_findings_error_scan_not_exists ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
client . create_repository ( repositoryName = ECR_REPO )
2021-08-12 05:06:21 +00:00
image_tag = " latest "
client . put_image (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO ,
2021-08-12 05:06:21 +00:00
imageManifest = json . dumps ( _create_image_manifest ( ) ) ,
imageTag = image_tag ,
)
# when
with pytest . raises ( ClientError ) as e :
client . describe_image_scan_findings (
2023-11-07 18:29:35 +00:00
repositoryName = ECR_REPO , imageId = { " imageTag " : image_tag }
2021-08-12 05:06:21 +00:00
)
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " DescribeImageScanFindings "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " ScanNotFoundException "
assert (
ex . response [ " Error " ] [ " Message " ]
2023-11-07 18:29:35 +00:00
== f " Image scan does not exist for the image with ' {{ imageDigest: ' null ' , imageTag: ' { image_tag } ' }} ' in the repository with name ' { ECR_REPO } ' in the registry with id ' { ACCOUNT_ID } ' "
2021-08-12 05:06:21 +00:00
)
2021-08-14 15:15:56 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-14 15:15:56 +00:00
def test_put_replication_configuration ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-14 15:15:56 +00:00
config = {
" rules " : [ { " destinations " : [ { " region " : " eu-west-1 " , " registryId " : ACCOUNT_ID } ] } ]
}
# when
response = client . put_replication_configuration ( replicationConfiguration = config )
# then
2023-07-17 10:21:32 +00:00
assert response [ " replicationConfiguration " ] == config
2021-08-14 15:15:56 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-14 15:15:56 +00:00
def test_put_replication_configuration_error_feature_disabled ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-14 15:15:56 +00:00
config = {
" rules " : [
{
" destinations " : [
{ " region " : " eu-central-1 " , " registryId " : " 111111111111 " } ,
]
} ,
{
" destinations " : [
{ " region " : " eu-central-1 " , " registryId " : " 222222222222 " } ,
]
} ,
]
}
# when
with pytest . raises ( ClientError ) as e :
client . put_replication_configuration ( replicationConfiguration = config )
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutReplicationConfiguration "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " ValidationException "
assert ex . response [ " Error " ] [ " Message " ] == " This feature is disabled "
2021-08-14 15:15:56 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-14 15:15:56 +00:00
def test_put_replication_configuration_error_same_source ( ) :
# given
region_name = " eu-central-1 "
client = boto3 . client ( " ecr " , region_name = region_name )
config = {
" rules " : [
{ " destinations " : [ { " region " : region_name , " registryId " : ACCOUNT_ID } ] } ,
]
}
# when
with pytest . raises ( ClientError ) as e :
client . put_replication_configuration ( replicationConfiguration = config )
# then
ex = e . value
2023-07-17 10:21:32 +00:00
assert ex . operation_name == " PutReplicationConfiguration "
assert ex . response [ " ResponseMetadata " ] [ " HTTPStatusCode " ] == 400
assert ex . response [ " Error " ] [ " Code " ] == " InvalidParameterException "
assert (
ex . response [ " Error " ] [ " Message " ]
== " Invalid parameter at ' replicationConfiguration ' failed to satisfy constraint: ' Replication destination cannot be the same as the source registry ' "
2021-08-14 15:15:56 +00:00
)
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-14 15:15:56 +00:00
def test_describe_registry ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-14 15:15:56 +00:00
# when
response = client . describe_registry ( )
# then
2023-07-17 10:21:32 +00:00
assert response [ " registryId " ] == ACCOUNT_ID
assert response [ " replicationConfiguration " ] == { " rules " : [ ] }
2021-08-14 15:15:56 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2021-08-14 15:15:56 +00:00
def test_describe_registry_after_update ( ) :
# given
2023-11-07 18:29:35 +00:00
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
2021-08-14 15:15:56 +00:00
config = {
" rules " : [
{ " destinations " : [ { " region " : " eu-west-1 " , " registryId " : ACCOUNT_ID } ] } ,
]
}
client . put_replication_configuration ( replicationConfiguration = config )
# when
response = client . describe_registry ( )
# then
2023-07-17 10:21:32 +00:00
assert response [ " replicationConfiguration " ] == config
2023-11-07 18:29:35 +00:00
2024-01-07 12:03:33 +00:00
@mock_aws
2023-11-07 18:29:35 +00:00
def test_ecr_image_digest ( ) :
# given
client = boto3 . client ( " ecr " , region_name = ECR_REGION )
digest = " sha256:826b6832e45ba17d625debc95ae8554e148550b00c05b47fa8f7be1c555bc83c "
client . create_repository ( repositoryName = ECR_REPO )
image_manifest = {
" config " : {
" digest " : " sha256:6442bc26a7c562f5afe6467dab36365c709909f6a81afcecfc0c25cff0f1bab0 " ,
" mediaType " : " application/vnd.docker.container.image.v1+json " ,
" size " : 5205 ,
} ,
" layers " : [
{
" digest " : " sha256:b35e87b5838011a3637be660e4238af9a55e4edc74404c990f7a558e7f416658 " ,
" mediaType " : " application/vnd.docker.image.rootfs.diff.tar.gzip " ,
" size " : 26690191 ,
}
] ,
" mediaType " : " application/vnd.docker.distribution.manifest.v2+json " ,
" schemaVersion " : 2 ,
}
# when
put_response = client . put_image (
repositoryName = ECR_REPO ,
imageManifest = json . dumps ( image_manifest ) ,
imageManifestMediaType = " application/vnd.oci.image.manifest.v1+json " ,
imageTag = " test1 " ,
imageDigest = digest ,
)
describe_response = client . describe_images ( repositoryName = ECR_REPO )
# then
assert put_response [ " image " ] [ " imageId " ] [ " imageDigest " ] == digest
assert describe_response [ " imageDetails " ] [ 0 ] [ " imageDigest " ] == digest