Added support for filtering AMIs by self (#1398)

* Added support for filtering AMIs by self

Closes: https://github.com/spulec/moto/issues/1396

* Adjusted regex to also match signature v4 and fixed py3 compatibility
This commit is contained in:
Nuwan Goonasekera 2018-01-04 15:31:17 +05:30 committed by Terry Cain
parent 71af9317f2
commit 56ce26a728
4 changed files with 44 additions and 6 deletions

View File

@ -108,6 +108,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
# to extract region, use [^.]
region_regex = re.compile(r'\.(?P<region>[a-z]{2}-[a-z]+-\d{1})\.amazonaws\.com')
param_list_regex = re.compile(r'(.*)\.(\d+)\.')
access_key_regex = re.compile(r'AWS.*(?P<access_key>(?<![A-Z0-9])[A-Z0-9]{20}(?![A-Z0-9]))[:/]')
aws_service_spec = None
@classmethod
@ -178,6 +179,21 @@ class BaseResponse(_TemplateEnvironmentMixin):
region = self.default_region
return region
def get_current_user(self):
"""
Returns the access key id used in this request as the current user id
"""
if 'Authorization' in self.headers:
match = self.access_key_regex.search(self.headers['Authorization'])
if match:
return match.group(1)
if self.querystring.get('AWSAccessKeyId'):
return self.querystring.get('AWSAccessKeyId')
else:
# Should we raise an unauthorized exception instead?
return None
def _dispatch(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
return self.call_action()

View File

@ -1033,7 +1033,6 @@ class TagBackend(object):
class Ami(TaggedEC2Resource):
def __init__(self, ec2_backend, ami_id, instance=None, source_ami=None,
name=None, description=None, owner_id=None,
public=False, virtualization_type=None, architecture=None,
state='available', creation_date=None, platform=None,
image_type='machine', image_location=None, hypervisor=None,
@ -1138,12 +1137,14 @@ class AmiBackend(object):
ami_id = ami['ami_id']
self.amis[ami_id] = Ami(self, **ami)
def create_image(self, instance_id, name=None, description=None, owner_id=None):
def create_image(self, instance_id, name=None, description=None,
context=None):
# TODO: check that instance exists and pull info from it.
ami_id = random_ami_id()
instance = self.get_instance(instance_id)
ami = Ami(self, ami_id, instance=instance, source_ami=None,
name=name, description=description, owner_id=owner_id)
name=name, description=description,
owner_id=context.get_current_user() if context else None)
self.amis[ami_id] = ami
return ami
@ -1156,7 +1157,8 @@ class AmiBackend(object):
self.amis[ami_id] = ami
return ami
def describe_images(self, ami_ids=(), filters=None, exec_users=None, owners=None):
def describe_images(self, ami_ids=(), filters=None, exec_users=None, owners=None,
context=None):
images = self.amis.values()
# Limit images by launch permissions
@ -1170,6 +1172,11 @@ class AmiBackend(object):
# Limit by owner ids
if owners:
# support filtering by Owners=['self']
owners = list(map(
lambda o: context.get_current_user()
if context and o == 'self' else o,
owners))
images = [ami for ami in images if ami.owner_id in owners]
if ami_ids:

View File

@ -11,7 +11,7 @@ class AmisResponse(BaseResponse):
instance_id = self._get_param('InstanceId')
if self.is_not_dryrun('CreateImage'):
image = self.ec2_backend.create_image(
instance_id, name, description)
instance_id, name, description, context=self)
template = self.response_template(CREATE_IMAGE_RESPONSE)
return template.render(image=image)
@ -39,7 +39,8 @@ class AmisResponse(BaseResponse):
owners = self._get_multi_param('Owner')
exec_users = self._get_multi_param('ExecutableBy')
images = self.ec2_backend.describe_images(
ami_ids=ami_ids, filters=filters, exec_users=exec_users, owners=owners)
ami_ids=ami_ids, filters=filters, exec_users=exec_users,
owners=owners, context=self)
template = self.response_template(DESCRIBE_IMAGES_RESPONSE)
return template.render(images=images)

View File

@ -705,3 +705,17 @@ def test_ami_filter_by_owner_id():
assert all(ubuntu_ids) and ubuntu_ids[0] == ubuntu_id
# Check we actually have a subset of images
assert len(ubuntu_ids) < len(all_ids)
@mock_ec2
def test_ami_filter_by_self():
client = boto3.client('ec2', region_name='us-east-1')
my_images = client.describe_images(Owners=['self'])
assert len(my_images) == 0
# Create a new image
instance = ec2.create_instances(ImageId='ami-1234abcd', MinCount=1, MaxCount=1)[0]
image = instance.create_image(Name='test-image')
my_images = client.describe_images(Owners=['self'])
assert len(my_images) == 1