Adding ECR

This commit is contained in:
Mike Bjerkness 2017-05-22 15:49:59 -05:00
parent 584352aaf6
commit 588e211c71
5 changed files with 311 additions and 0 deletions

View File

@ -14,6 +14,7 @@ from .datapipeline import mock_datapipeline, mock_datapipeline_deprecated # fla
from .dynamodb import mock_dynamodb, mock_dynamodb_deprecated # flake8: noqa
from .dynamodb2 import mock_dynamodb2, mock_dynamodb2_deprecated # flake8: noqa
from .ec2 import mock_ec2, mock_ec2_deprecated # flake8: noqa
from .ecr import mock_ecr, mock_ecr_deprecated # flake8: noqa
from .ecs import mock_ecs, mock_ecs_deprecated # flake8: noqa
from .elb import mock_elb, mock_elb_deprecated # flake8: noqa
from .emr import mock_emr, mock_emr_deprecated # flake8: noqa

7
moto/ecr/__init__.py Normal file
View File

@ -0,0 +1,7 @@
from __future__ import unicode_literals
from .models import ecr_backends
from ..core.models import base_decorator, deprecated_base_decorator
ecr_backend = ecr_backends['us-east-1']
mock_ecr = base_decorator(ecr_backends)
mock_ecr_deprecated = deprecated_base_decorator(ecr_backends)

221
moto/ecr/models.py Normal file
View File

@ -0,0 +1,221 @@
from __future__ import unicode_literals
# from datetime import datetime
from random import random
from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends
from copy import copy
import hashlib
class BaseObject(BaseModel):
def camelCase(self, key):
words = []
for i, word in enumerate(key.split('_')):
if i > 0:
words.append(word.title())
else:
words.append(word)
return ''.join(words)
def gen_response_object(self):
response_object = copy(self.__dict__)
for key, value in response_object.items():
if '_' in key:
response_object[self.camelCase(key)] = value
del response_object[key]
return response_object
@property
def response_object(self):
return self.gen_response_object()
class Repository(BaseObject):
def __init__(self, repository_name):
self.arn = 'arn:aws:ecr:us-east-1:012345678910:repository/{0}'.format(
repository_name)
self.name = repository_name
# self.created = datetime.utcnow()
self.uri = '012345678910.dkr.ecr.us-east-1.amazonaws.com/{0}'.format(
repository_name
)
self.registry_id = '012345678910'
self.images = []
@property
def physical_resource_id(self):
return self.name
@property
def response_object(self):
response_object = self.gen_response_object()
response_object['registryId'] = self.registry_id
response_object['repositoryArn'] = self.arn
response_object['repositoryName'] = self.name
response_object['repositoryUri'] = self.uri
# response_object['createdAt'] = self.created
del response_object['arn'], response_object['name']
return response_object
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ecr_backend = ecr_backends[region_name]
return ecr_backend.create_repository(
# RepositoryName is optional in CloudFormation, thus create a random
# name if necessary
repository_name=properties.get(
'RepositoryName', 'ecrrepository{0}'.format(int(random() * 10 ** 6))),
)
@classmethod
def update_from_cloudformation_json(cls, original_resource, new_resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
if original_resource.name != properties['RepositoryName']:
ecr_backend = ecr_backends[region_name]
ecr_backend.delete_cluster(original_resource.arn)
return ecr_backend.create_repository(
# RepositoryName is optional in CloudFormation, thus create a
# random name if necessary
repository_name=properties.get(
'RepositoryName', 'RepositoryName{0}'.format(int(random() * 10 ** 6))),
)
else:
# no-op when nothing changed between old and new resources
return original_resource
class Image(BaseObject):
def __init__(self, tag, manifest, repository, registry_id="012345678910"):
self.image_tag = tag
self.image_manifest = manifest
self.image_size_in_bytes = 50 * 1024 * 1024
self.repository = repository
self.registry_id = registry_id
self.image_digest = None
self.image_pushed_at = None
def _create_digest(self):
image_contents = 'docker_image{0}'.format(int(random() * 10 ** 6))
self.image_digest = "sha256:%s" % hashlib.sha256(image_contents).hexdigest()
def get_image_digest(self):
if not self.image_digest:
self._create_digest()
return self.image_digest
@property
def response_object(self):
response_object = self.gen_response_object()
response_object['imageId'] = {}
response_object['imageId']['imageTag'] = self.image_tag
response_object['imageId']['imageDigest'] = self.get_image_digest()
response_object['imageManifest'] = self.image_manifest
response_object['repositoryName'] = self.repository
response_object['registryId'] = self.registry_id
return response_object
@property
def response_list_object(self):
response_object = self.gen_response_object()
response_object['imageTag'] = self.image_tag
response_object['imageDigest'] = "i don't know"
return response_object
@property
def response_describe_object(self):
response_object = self.gen_response_object()
response_object['imageTags'] = [self.image_tag]
response_object['imageDigest'] = self.get_image_digest()
response_object['imageManifest'] = self.image_manifest
response_object['repositoryName'] = self.repository
response_object['registryId'] = self.registry_id
response_object['imageSizeInBytes'] = self.image_size_in_bytes
response_object['imagePushedAt'] = '2017-05-09'
return response_object
class ECRBackend(BaseBackend):
def __init__(self):
self.repositories = {}
def describe_repositories(self, registry_id=None, repository_names=None):
"""
maxResults and nextToken not implemented
"""
repositories = []
for repository in self.repositories.values():
# If a registry_id was supplied, ensure this repository matches
if registry_id:
if repository.registry_id != registry_id:
continue
# If a list of repository names was supplied, esure this repository
# is in that list
if repository_names:
if repository.name not in repository_names:
continue
repositories.append(repository.response_object)
return repositories
def create_repository(self, repository_name):
repository = Repository(repository_name)
self.repositories[repository_name] = repository
return repository
def delete_repository(self, respository_name, registry_id=None):
if respository_name in self.repositories:
return self.repositories.pop(respository_name)
else:
raise Exception("{0} is not a repository".format(respository_name))
def list_images(self, repository_name, registry_id=None):
"""
maxResults and filtering not implemented
"""
images = []
for repository in self.repositories.values():
if repository_name:
if repository.name != repository_name:
continue
if registry_id:
if repository.registry_id != registry_id:
continue
for image in repository.images:
images.append(image)
return images
def describe_images(self, repository_name, registry_id=None, image_id=None):
if repository_name in self.repositories:
repository = self.repositories[repository_name]
else:
raise Exception("{0} is not a repository".format(repository_name))
response = []
for image in repository.images:
response.append(image)
return response
def put_image(self, repository_name, image_manifest, image_tag):
if repository_name in self.repositories:
repository = self.repositories[repository_name]
else:
raise Exception("{0} is not a repository".format(repository_name))
image = Image(image_tag, image_manifest, repository_name)
repository.images.append(image)
return image
ecr_backends = {}
for region, ec2_backend in ec2_backends.items():
ecr_backends[region] = ECRBackend()

72
moto/ecr/responses.py Normal file
View File

@ -0,0 +1,72 @@
from __future__ import unicode_literals
import json
from moto.core.responses import BaseResponse
from .models import ecr_backends
class ECRResponse(BaseResponse):
@property
def ecr_backend(self):
return ecr_backends[self.region]
@property
def request_params(self):
try:
return json.loads(self.body)
except ValueError:
return {}
def _get_param(self, param):
return self.request_params.get(param, None)
def create_repository(self):
repository_name = self._get_param('repositoryName')
if repository_name is None:
repository_name = 'default'
repository = self.ecr_backend.create_repository(repository_name)
return json.dumps({
'repository': repository.response_object
})
def describe_repositories(self):
describe_repositories_name = self._get_param('repositoryNames')
repositories = self.ecr_backend.describe_repositories(describe_repositories_name)
return json.dumps({
'repositories': repositories,
'failures': []
})
def delete_repository(self):
repository_str = self._get_param('repositoryName')
repository = self.ecr_backend.delete_repository(repository_str)
return json.dumps({
'repository': repository.response_object
})
def put_image(self):
repository_str = self._get_param('repositoryName')
image_manifest = self._get_param('imageManifest')
image_tag = self._get_param('imageTag')
image = self.ecr_backend.put_image(repository_str, image_manifest, image_tag)
return json.dumps({
'image': image.response_object
})
def list_images(self):
repository_str = self._get_param('repositoryName')
registry_id = self._get_param('registryId')
images = self.ecr_backend.list_images(repository_str, registry_id)
return json.dumps({
'imageIds': [image.response_list_object for image in images],
})
def describe_images(self):
repository_str = self._get_param('repositoryName')
registry_id = self._get_param('registryId')
images = self.ecr_backend.describe_images(repository_str, registry_id)
return json.dumps({
'imageDetails': [image.response_describe_object for image in images],
})

10
moto/ecr/urls.py Normal file
View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import ECRResponse
url_bases = [
"https?://ecr.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': ECRResponse.dispatch,
}