Add batch_get_image support for ECR (#1406)

* Add batch_get_image for ECR

* Add tests for batch_get_image

* Add tests for batch_get_image

* Undo local commits

* Undo local commits

* Adding object representation for batch_get_image

* Update responses. Add a couple more tests.
This commit is contained in:
Mike Bjerkness 2017-12-30 20:39:23 -06:00 committed by Jack Danger
parent 84f2ec5e04
commit b855fee2e4
3 changed files with 170 additions and 8 deletions

View File

@ -1,14 +1,14 @@
from __future__ import unicode_literals
# from datetime import datetime
import hashlib
from copy import copy
from random import random
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
from copy import copy
import hashlib
from moto.ecr.exceptions import ImageNotFoundException, RepositoryNotFoundException
from botocore.exceptions import ParamValidationError
DEFAULT_REGISTRY_ID = '012345678910'
@ -145,6 +145,17 @@ class Image(BaseObject):
response_object['imagePushedAt'] = '2017-05-09'
return response_object
@property
def response_batch_get_image(self):
response_object = {}
response_object['imageId'] = {}
response_object['imageId']['imageTag'] = self.image_tag
response_object['imageId']['imageDigest'] = self.get_image_digest()
response_object['imageManifest'] = self.image_manifest
response_object['repositoryName'] = self.repository
response_object['registryId'] = self.registry_id
return response_object
class ECRBackend(BaseBackend):
@ -245,6 +256,39 @@ class ECRBackend(BaseBackend):
repository.images.append(image)
return image
def batch_get_image(self, repository_name, registry_id=None, image_ids=None, accepted_media_types=None):
if repository_name in self.repositories:
repository = self.repositories[repository_name]
else:
raise RepositoryNotFoundException(repository_name, registry_id or DEFAULT_REGISTRY_ID)
if not image_ids:
raise ParamValidationError(msg='Missing required parameter in input: "imageIds"')
response = {
'images': [],
'failures': [],
}
for image_id in image_ids:
found = False
for image in repository.images:
if (('imageDigest' in image_id and image.get_image_digest() == image_id['imageDigest']) or
('imageTag' in image_id and image.image_tag == image_id['imageTag'])):
found = True
response['images'].append(image.response_batch_get_image)
if not found:
response['failures'].append({
'imageId': {
'imageTag': image_id.get('imageTag', 'null')
},
'failureCode': 'ImageNotFound',
'failureReason': 'Requested image not found'
})
return response
ecr_backends = {}
for region, ec2_backend in ec2_backends.items():

View File

@ -89,9 +89,13 @@ class ECRResponse(BaseResponse):
'ECR.batch_delete_image is not yet implemented')
def batch_get_image(self):
if self.is_not_dryrun('BatchGetImage'):
raise NotImplementedError(
'ECR.batch_get_image is not yet implemented')
repository_str = self._get_param('repositoryName')
registry_id = self._get_param('registryId')
image_ids = self._get_param('imageIds')
accepted_media_types = self._get_param('acceptedMediaTypes')
response = self.ecr_backend.batch_get_image(repository_str, registry_id, image_ids, accepted_media_types)
return json.dumps(response)
def can_paginate(self):
if self.is_not_dryrun('CanPaginate'):

View File

@ -9,7 +9,7 @@ import re
import sure # noqa
import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import ClientError, ParamValidationError
from dateutil.tz import tzlocal
from moto import mock_ecr
@ -445,3 +445,117 @@ def test_get_authorization_token_explicit_regions():
}
])
@mock_ecr
def test_batch_get_image():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='latest'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v1'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v2'
)
response = client.batch_get_image(
repositoryName='test_repository',
imageIds=[
{
'imageTag': 'v2'
},
],
)
type(response['images']).should.be(list)
len(response['images']).should.be(1)
response['images'][0]['imageManifest'].should.contain("vnd.docker.distribution.manifest.v2+json")
response['images'][0]['registryId'].should.equal("012345678910")
response['images'][0]['repositoryName'].should.equal("test_repository")
response['images'][0]['imageId']['imageTag'].should.equal("v2")
response['images'][0]['imageId']['imageDigest'].should.contain("sha")
type(response['failures']).should.be(list)
len(response['failures']).should.be(0)
@mock_ecr
def test_batch_get_image_that_doesnt_exist():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='latest'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v1'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v2'
)
response = client.batch_get_image(
repositoryName='test_repository',
imageIds=[
{
'imageTag': 'v5'
},
],
)
type(response['images']).should.be(list)
len(response['images']).should.be(0)
type(response['failures']).should.be(list)
len(response['failures']).should.be(1)
response['failures'][0]['failureReason'].should.equal("Requested image not found")
response['failures'][0]['failureCode'].should.equal("ImageNotFound")
response['failures'][0]['imageId']['imageTag'].should.equal("v5")
@mock_ecr
def test_batch_get_image_no_tags():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='latest'
)
error_msg = re.compile(
r".*Missing required parameter in input: \"imageIds\".*",
re.MULTILINE)
client.batch_get_image.when.called_with(
repositoryName='test_repository').should.throw(
ParamValidationError, error_msg)