Merge pull request #1 from spulec/master

updating with upstream changes
This commit is contained in:
George Ionita 2018-01-14 19:08:16 +02:00 committed by GitHub
commit baba13c9bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1296 additions and 53 deletions

7
.bumpversion.cfg Normal file
View File

@ -0,0 +1,7 @@
[bumpversion]
current_version = 1.2.0
[bumpversion:file:setup.py]
[bumpversion:file:moto/__init__.py]

View File

@ -1,11 +1,18 @@
Moto Changelog Moto Changelog
=================== ===================
Latest 1.2.0
------ ------
* Supports filtering AMIs by self
* Implemented signal_workflow_execution for SWF * Implemented signal_workflow_execution for SWF
* Wired SWF backend to the moto server * Wired SWF backend to the moto server
* Fixed incorrect handling of task list parameter on start_workflow_execution * Revamped lambda function storage to do versioning
* IOT improvements
* RDS improvements
* Implemented CloudWatch get_metric_statistics
* Improved Cloudformation EC2 support
* Implemented Cloudformation change_set endpoints
1.1.25 1.1.25
----- -----

View File

@ -1,4 +1,25 @@
### Contributing code ### Contributing code
If you have improvements to Moto, send us your pull requests! For those Moto has a [Code of Conduct](https://github.com/spulec/moto/blob/master/CODE_OF_CONDUCT.md), you can expect to be treated with respect at all times when interacting with this project.
just getting started, Github has a [howto](https://help.github.com/articles/using-pull-requests/).
## Is there a missing feature?
Moto is easier to contribute to than you probably think. There's [a list of which endpoints have been implemented](https://github.com/spulec/moto/blob/master/IMPLEMENTATION_COVERAGE.md) and we invite you to add new endpoints to existing services or to add new services.
How to teach Moto to support a new AWS endpoint:
* Create an issue describing what's missing. This is where we'll all talk about the new addition and help you get it done.
* Create a [pull request](https://help.github.com/articles/using-pull-requests/) and mention the issue # in the PR description.
* Try to add a failing test case. For example, if you're trying to implement `boto3.client('acm').import_certificate()` you'll want to add a new method called `def test_import_certificate` to `tests/test_acm/test_acm.py`.
* If you can also implement the code that gets that test passing that's great. If not, just ask the community for a hand and somebody will assist you.
# Maintainers
## Releasing a new version of Moto
You'll need a PyPi account and a Dockerhub account to release Moto. After we release a new PyPi package we build and push the [motoserver/moto](https://hub.docker.com/r/motoserver/moto/) Docker image.
* First, `scripts/bump_version` modifies the version and opens a PR
* Then, merge the new pull request
* Finally, generate and ship the new artifacts with `make publish`

View File

@ -36,7 +36,7 @@ tag_github_release:
git tag `python setup.py --version` git tag `python setup.py --version`
git push origin `python setup.py --version` git push origin `python setup.py --version`
publish: implementation_coverage \ publish:
upload_pypi_artifact \ upload_pypi_artifact \
tag_github_release \ tag_github_release \
push_dockerhub_image push_dockerhub_image

View File

@ -3,7 +3,7 @@ import logging
# logging.getLogger('boto').setLevel(logging.CRITICAL) # logging.getLogger('boto').setLevel(logging.CRITICAL)
__title__ = 'moto' __title__ = 'moto'
__version__ = '1.0.1' __version__ = '1.2.0',
from .acm import mock_acm # flake8: noqa from .acm import mock_acm # flake8: noqa
from .apigateway import mock_apigateway, mock_apigateway_deprecated # flake8: noqa from .apigateway import mock_apigateway, mock_apigateway_deprecated # flake8: noqa

View File

@ -107,7 +107,8 @@ class FakeStack(BaseModel):
def update(self, template, role_arn=None, parameters=None, tags=None): def update(self, template, role_arn=None, parameters=None, tags=None):
self._add_stack_event("UPDATE_IN_PROGRESS", resource_status_reason="User Initiated") self._add_stack_event("UPDATE_IN_PROGRESS", resource_status_reason="User Initiated")
self.template = template self.template = template
self.resource_map.update(json.loads(template), parameters) self._parse_template()
self.resource_map.update(self.template_dict, parameters)
self.output_map = self._create_output_map() self.output_map = self._create_output_map()
self._add_stack_event("UPDATE_COMPLETE") self._add_stack_event("UPDATE_COMPLETE")
self.status = "UPDATE_COMPLETE" self.status = "UPDATE_COMPLETE"

View File

@ -108,6 +108,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
# to extract region, use [^.] # to extract region, use [^.]
region_regex = re.compile(r'\.(?P<region>[a-z]{2}-[a-z]+-\d{1})\.amazonaws\.com') region_regex = re.compile(r'\.(?P<region>[a-z]{2}-[a-z]+-\d{1})\.amazonaws\.com')
param_list_regex = re.compile(r'(.*)\.(\d+)\.') param_list_regex = re.compile(r'(.*)\.(\d+)\.')
access_key_regex = re.compile(r'AWS.*(?P<access_key>(?<![A-Z0-9])[A-Z0-9]{20}(?![A-Z0-9]))[:/]')
aws_service_spec = None aws_service_spec = None
@classmethod @classmethod
@ -178,6 +179,21 @@ class BaseResponse(_TemplateEnvironmentMixin):
region = self.default_region region = self.default_region
return region return region
def get_current_user(self):
"""
Returns the access key id used in this request as the current user id
"""
if 'Authorization' in self.headers:
match = self.access_key_regex.search(self.headers['Authorization'])
if match:
return match.group(1)
if self.querystring.get('AWSAccessKeyId'):
return self.querystring.get('AWSAccessKeyId')
else:
# Should we raise an unauthorized exception instead?
return None
def _dispatch(self, request, full_url, headers): def _dispatch(self, request, full_url, headers):
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
return self.call_action() return self.call_action()
@ -272,6 +288,9 @@ class BaseResponse(_TemplateEnvironmentMixin):
headers['status'] = str(headers['status']) headers['status'] = str(headers['status'])
return status, headers, body return status, headers, body
if not action:
return 404, headers, ''
raise NotImplementedError( raise NotImplementedError(
"The {0} action has not been implemented".format(action)) "The {0} action has not been implemented".format(action))

View File

@ -18,6 +18,8 @@ def camelcase_to_underscores(argument):
python underscore variable like the_new_attribute''' python underscore variable like the_new_attribute'''
result = '' result = ''
prev_char_title = True prev_char_title = True
if not argument:
return argument
for index, char in enumerate(argument): for index, char in enumerate(argument):
try: try:
next_char_title = argument[index + 1].istitle() next_char_title = argument[index + 1].istitle()

View File

@ -1033,7 +1033,6 @@ class TagBackend(object):
class Ami(TaggedEC2Resource): class Ami(TaggedEC2Resource):
def __init__(self, ec2_backend, ami_id, instance=None, source_ami=None, def __init__(self, ec2_backend, ami_id, instance=None, source_ami=None,
name=None, description=None, owner_id=None, name=None, description=None, owner_id=None,
public=False, virtualization_type=None, architecture=None, public=False, virtualization_type=None, architecture=None,
state='available', creation_date=None, platform=None, state='available', creation_date=None, platform=None,
image_type='machine', image_location=None, hypervisor=None, image_type='machine', image_location=None, hypervisor=None,
@ -1138,12 +1137,14 @@ class AmiBackend(object):
ami_id = ami['ami_id'] ami_id = ami['ami_id']
self.amis[ami_id] = Ami(self, **ami) self.amis[ami_id] = Ami(self, **ami)
def create_image(self, instance_id, name=None, description=None, owner_id=None): def create_image(self, instance_id, name=None, description=None,
context=None):
# TODO: check that instance exists and pull info from it. # TODO: check that instance exists and pull info from it.
ami_id = random_ami_id() ami_id = random_ami_id()
instance = self.get_instance(instance_id) instance = self.get_instance(instance_id)
ami = Ami(self, ami_id, instance=instance, source_ami=None, ami = Ami(self, ami_id, instance=instance, source_ami=None,
name=name, description=description, owner_id=owner_id) name=name, description=description,
owner_id=context.get_current_user() if context else None)
self.amis[ami_id] = ami self.amis[ami_id] = ami
return ami return ami
@ -1156,7 +1157,8 @@ class AmiBackend(object):
self.amis[ami_id] = ami self.amis[ami_id] = ami
return ami return ami
def describe_images(self, ami_ids=(), filters=None, exec_users=None, owners=None): def describe_images(self, ami_ids=(), filters=None, exec_users=None, owners=None,
context=None):
images = self.amis.values() images = self.amis.values()
# Limit images by launch permissions # Limit images by launch permissions
@ -1170,6 +1172,11 @@ class AmiBackend(object):
# Limit by owner ids # Limit by owner ids
if owners: if owners:
# support filtering by Owners=['self']
owners = list(map(
lambda o: context.get_current_user()
if context and o == 'self' else o,
owners))
images = [ami for ami in images if ami.owner_id in owners] images = [ami for ami in images if ami.owner_id in owners]
if ami_ids: if ami_ids:
@ -1261,8 +1268,15 @@ class RegionsAndZonesBackend(object):
(region, [Zone(region + c, region) for c in 'abc']) (region, [Zone(region + c, region) for c in 'abc'])
for region in [r.name for r in regions]) for region in [r.name for r in regions])
def describe_regions(self): def describe_regions(self, region_names=[]):
return self.regions if len(region_names) == 0:
return self.regions
ret = []
for name in region_names:
for region in self.regions:
if region.name == name:
ret.append(region)
return ret
def describe_availability_zones(self): def describe_availability_zones(self):
return self.zones[self.region_name] return self.zones[self.region_name]
@ -2004,6 +2018,11 @@ class VPC(TaggedEC2Resource):
cidr_block=properties['CidrBlock'], cidr_block=properties['CidrBlock'],
instance_tenancy=properties.get('InstanceTenancy', 'default') instance_tenancy=properties.get('InstanceTenancy', 'default')
) )
for tag in properties.get("Tags", []):
tag_key = tag["Key"]
tag_value = tag["Value"]
vpc.add_tag(tag_key, tag_value)
return vpc return vpc
@property @property

View File

@ -11,7 +11,7 @@ class AmisResponse(BaseResponse):
instance_id = self._get_param('InstanceId') instance_id = self._get_param('InstanceId')
if self.is_not_dryrun('CreateImage'): if self.is_not_dryrun('CreateImage'):
image = self.ec2_backend.create_image( image = self.ec2_backend.create_image(
instance_id, name, description) instance_id, name, description, context=self)
template = self.response_template(CREATE_IMAGE_RESPONSE) template = self.response_template(CREATE_IMAGE_RESPONSE)
return template.render(image=image) return template.render(image=image)
@ -39,7 +39,8 @@ class AmisResponse(BaseResponse):
owners = self._get_multi_param('Owner') owners = self._get_multi_param('Owner')
exec_users = self._get_multi_param('ExecutableBy') exec_users = self._get_multi_param('ExecutableBy')
images = self.ec2_backend.describe_images( images = self.ec2_backend.describe_images(
ami_ids=ami_ids, filters=filters, exec_users=exec_users, owners=owners) ami_ids=ami_ids, filters=filters, exec_users=exec_users,
owners=owners, context=self)
template = self.response_template(DESCRIBE_IMAGES_RESPONSE) template = self.response_template(DESCRIBE_IMAGES_RESPONSE)
return template.render(images=images) return template.render(images=images)

View File

@ -10,7 +10,8 @@ class AvailabilityZonesAndRegions(BaseResponse):
return template.render(zones=zones) return template.render(zones=zones)
def describe_regions(self): def describe_regions(self):
regions = self.ec2_backend.describe_regions() region_names = self._get_multi_param('RegionName')
regions = self.ec2_backend.describe_regions(region_names)
template = self.response_template(DESCRIBE_REGIONS_RESPONSE) template = self.response_template(DESCRIBE_REGIONS_RESPONSE)
return template.render(regions=regions) return template.render(regions=regions)

View File

@ -1,14 +1,14 @@
from __future__ import unicode_literals from __future__ import unicode_literals
# from datetime import datetime
import hashlib
from copy import copy
from random import random from random import random
from moto.core import BaseBackend, BaseModel from moto.core import BaseBackend, BaseModel
from moto.ec2 import ec2_backends from moto.ec2 import ec2_backends
from copy import copy
import hashlib
from moto.ecr.exceptions import ImageNotFoundException, RepositoryNotFoundException from moto.ecr.exceptions import ImageNotFoundException, RepositoryNotFoundException
from botocore.exceptions import ParamValidationError
DEFAULT_REGISTRY_ID = '012345678910' DEFAULT_REGISTRY_ID = '012345678910'
@ -145,6 +145,17 @@ class Image(BaseObject):
response_object['imagePushedAt'] = '2017-05-09' response_object['imagePushedAt'] = '2017-05-09'
return response_object return response_object
@property
def response_batch_get_image(self):
response_object = {}
response_object['imageId'] = {}
response_object['imageId']['imageTag'] = self.image_tag
response_object['imageId']['imageDigest'] = self.get_image_digest()
response_object['imageManifest'] = self.image_manifest
response_object['repositoryName'] = self.repository
response_object['registryId'] = self.registry_id
return response_object
class ECRBackend(BaseBackend): class ECRBackend(BaseBackend):
@ -245,6 +256,39 @@ class ECRBackend(BaseBackend):
repository.images.append(image) repository.images.append(image)
return image return image
def batch_get_image(self, repository_name, registry_id=None, image_ids=None, accepted_media_types=None):
if repository_name in self.repositories:
repository = self.repositories[repository_name]
else:
raise RepositoryNotFoundException(repository_name, registry_id or DEFAULT_REGISTRY_ID)
if not image_ids:
raise ParamValidationError(msg='Missing required parameter in input: "imageIds"')
response = {
'images': [],
'failures': [],
}
for image_id in image_ids:
found = False
for image in repository.images:
if (('imageDigest' in image_id and image.get_image_digest() == image_id['imageDigest']) or
('imageTag' in image_id and image.image_tag == image_id['imageTag'])):
found = True
response['images'].append(image.response_batch_get_image)
if not found:
response['failures'].append({
'imageId': {
'imageTag': image_id.get('imageTag', 'null')
},
'failureCode': 'ImageNotFound',
'failureReason': 'Requested image not found'
})
return response
ecr_backends = {} ecr_backends = {}
for region, ec2_backend in ec2_backends.items(): for region, ec2_backend in ec2_backends.items():

View File

@ -89,9 +89,13 @@ class ECRResponse(BaseResponse):
'ECR.batch_delete_image is not yet implemented') 'ECR.batch_delete_image is not yet implemented')
def batch_get_image(self): def batch_get_image(self):
if self.is_not_dryrun('BatchGetImage'): repository_str = self._get_param('repositoryName')
raise NotImplementedError( registry_id = self._get_param('registryId')
'ECR.batch_get_image is not yet implemented') image_ids = self._get_param('imageIds')
accepted_media_types = self._get_param('acceptedMediaTypes')
response = self.ecr_backend.batch_get_image(repository_str, registry_id, image_ids, accepted_media_types)
return json.dumps(response)
def can_paginate(self): def can_paginate(self):
if self.is_not_dryrun('CanPaginate'): if self.is_not_dryrun('CanPaginate'):

View File

@ -349,6 +349,14 @@ class User(BaseModel):
raise IAMNotFoundException( raise IAMNotFoundException(
"Key {0} not found".format(access_key_id)) "Key {0} not found".format(access_key_id))
def update_access_key(self, access_key_id, status):
for key in self.access_keys:
if key.access_key_id == access_key_id:
key.status = status
break
else:
raise IAMNotFoundException("The Access Key with id {0} cannot be found".format(access_key_id))
def get_cfn_attribute(self, attribute_name): def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
if attribute_name == 'Arn': if attribute_name == 'Arn':
@ -817,6 +825,10 @@ class IAMBackend(BaseBackend):
key = user.create_access_key() key = user.create_access_key()
return key return key
def update_access_key(self, user_name, access_key_id, status):
user = self.get_user(user_name)
user.update_access_key(access_key_id, status)
def get_all_access_keys(self, user_name, marker=None, max_items=None): def get_all_access_keys(self, user_name, marker=None, max_items=None):
user = self.get_user(user_name) user = self.get_user(user_name)
keys = user.get_all_access_keys() keys = user.get_all_access_keys()

View File

@ -440,6 +440,14 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_ACCESS_KEY_TEMPLATE) template = self.response_template(CREATE_ACCESS_KEY_TEMPLATE)
return template.render(key=key) return template.render(key=key)
def update_access_key(self):
user_name = self._get_param('UserName')
access_key_id = self._get_param('AccessKeyId')
status = self._get_param('Status')
iam_backend.update_access_key(user_name, access_key_id, status)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name='UpdateAccessKey')
def list_access_keys(self): def list_access_keys(self):
user_name = self._get_param('UserName') user_name = self._get_param('UserName')

View File

@ -16,9 +16,17 @@ class ResourceNotFoundException(IoTClientError):
class InvalidRequestException(IoTClientError): class InvalidRequestException(IoTClientError):
def __init__(self): def __init__(self, msg=None):
self.code = 400 self.code = 400
super(InvalidRequestException, self).__init__( super(InvalidRequestException, self).__init__(
"InvalidRequestException", "InvalidRequestException",
"The request is not valid." msg or "The request is not valid."
)
class VersionConflictException(IoTClientError):
def __init__(self, name):
self.code = 409
super(VersionConflictException, self).__init__(
'The version for thing %s does not match the expected version.' % name
) )

View File

@ -9,7 +9,8 @@ from moto.core import BaseBackend, BaseModel
from collections import OrderedDict from collections import OrderedDict
from .exceptions import ( from .exceptions import (
ResourceNotFoundException, ResourceNotFoundException,
InvalidRequestException InvalidRequestException,
VersionConflictException
) )
@ -44,6 +45,7 @@ class FakeThingType(BaseModel):
self.region_name = region_name self.region_name = region_name
self.thing_type_name = thing_type_name self.thing_type_name = thing_type_name
self.thing_type_properties = thing_type_properties self.thing_type_properties = thing_type_properties
self.thing_type_id = str(uuid.uuid4()) # I don't know the rule of id
t = time.time() t = time.time()
self.metadata = { self.metadata = {
'deprecated': False, 'deprecated': False,
@ -54,11 +56,37 @@ class FakeThingType(BaseModel):
def to_dict(self): def to_dict(self):
return { return {
'thingTypeName': self.thing_type_name, 'thingTypeName': self.thing_type_name,
'thingTypeId': self.thing_type_id,
'thingTypeProperties': self.thing_type_properties, 'thingTypeProperties': self.thing_type_properties,
'thingTypeMetadata': self.metadata 'thingTypeMetadata': self.metadata
} }
class FakeThingGroup(BaseModel):
def __init__(self, thing_group_name, parent_group_name, thing_group_properties, region_name):
self.region_name = region_name
self.thing_group_name = thing_group_name
self.thing_group_id = str(uuid.uuid4()) # I don't know the rule of id
self.version = 1 # TODO: tmp
self.parent_group_name = parent_group_name
self.thing_group_properties = thing_group_properties or {}
t = time.time()
self.metadata = {
'creationData': int(t * 1000) / 1000.0
}
self.arn = 'arn:aws:iot:%s:1:thinggroup/%s' % (self.region_name, thing_group_name)
self.things = OrderedDict()
def to_dict(self):
return {
'thingGroupName': self.thing_group_name,
'thingGroupId': self.thing_group_id,
'version': self.version,
'thingGroupProperties': self.thing_group_properties,
'thingGroupMetadata': self.metadata
}
class FakeCertificate(BaseModel): class FakeCertificate(BaseModel):
def __init__(self, certificate_pem, status, region_name): def __init__(self, certificate_pem, status, region_name):
m = hashlib.sha256() m = hashlib.sha256()
@ -137,6 +165,7 @@ class IoTBackend(BaseBackend):
self.region_name = region_name self.region_name = region_name
self.things = OrderedDict() self.things = OrderedDict()
self.thing_types = OrderedDict() self.thing_types = OrderedDict()
self.thing_groups = OrderedDict()
self.certificates = OrderedDict() self.certificates = OrderedDict()
self.policies = OrderedDict() self.policies = OrderedDict()
self.principal_policies = OrderedDict() self.principal_policies = OrderedDict()
@ -359,6 +388,125 @@ class IoTBackend(BaseBackend):
principals = [k[0] for k, v in self.principal_things.items() if k[1] == thing_name] principals = [k[0] for k, v in self.principal_things.items() if k[1] == thing_name]
return principals return principals
def describe_thing_group(self, thing_group_name):
thing_groups = [_ for _ in self.thing_groups.values() if _.thing_group_name == thing_group_name]
if len(thing_groups) == 0:
raise ResourceNotFoundException()
return thing_groups[0]
def create_thing_group(self, thing_group_name, parent_group_name, thing_group_properties):
thing_group = FakeThingGroup(thing_group_name, parent_group_name, thing_group_properties, self.region_name)
self.thing_groups[thing_group.arn] = thing_group
return thing_group.thing_group_name, thing_group.arn, thing_group.thing_group_id
def delete_thing_group(self, thing_group_name, expected_version):
thing_group = self.describe_thing_group(thing_group_name)
del self.thing_groups[thing_group.arn]
def list_thing_groups(self, parent_group, name_prefix_filter, recursive):
thing_groups = self.thing_groups.values()
return thing_groups
def update_thing_group(self, thing_group_name, thing_group_properties, expected_version):
thing_group = self.describe_thing_group(thing_group_name)
if expected_version and expected_version != thing_group.version:
raise VersionConflictException(thing_group_name)
attribute_payload = thing_group_properties.get('attributePayload', None)
if attribute_payload is not None and 'attributes' in attribute_payload:
do_merge = attribute_payload.get('merge', False)
attributes = attribute_payload['attributes']
if not do_merge:
thing_group.thing_group_properties['attributePayload']['attributes'] = attributes
else:
thing_group.thing_group_properties['attributePayload']['attributes'].update(attributes)
elif attribute_payload is not None and 'attributes' not in attribute_payload:
thing_group.attributes = {}
thing_group.version = thing_group.version + 1
return thing_group.version
def _identify_thing_group(self, thing_group_name, thing_group_arn):
# identify thing group
if thing_group_name is None and thing_group_arn is None:
raise InvalidRequestException(
' Both thingGroupArn and thingGroupName are empty. Need to specify at least one of them'
)
if thing_group_name is not None:
thing_group = self.describe_thing_group(thing_group_name)
if thing_group_arn and thing_group.arn != thing_group_arn:
raise InvalidRequestException(
'ThingGroupName thingGroupArn does not match specified thingGroupName in request'
)
elif thing_group_arn is not None:
if thing_group_arn not in self.thing_groups:
raise InvalidRequestException()
thing_group = self.thing_groups[thing_group_arn]
return thing_group
def _identify_thing(self, thing_name, thing_arn):
# identify thing
if thing_name is None and thing_arn is None:
raise InvalidRequestException(
'Both thingArn and thingName are empty. Need to specify at least one of them'
)
if thing_name is not None:
thing = self.describe_thing(thing_name)
if thing_arn and thing.arn != thing_arn:
raise InvalidRequestException(
'ThingName thingArn does not match specified thingName in request'
)
elif thing_arn is not None:
if thing_arn not in self.things:
raise InvalidRequestException()
thing = self.things[thing_arn]
return thing
def add_thing_to_thing_group(self, thing_group_name, thing_group_arn, thing_name, thing_arn):
thing_group = self._identify_thing_group(thing_group_name, thing_group_arn)
thing = self._identify_thing(thing_name, thing_arn)
if thing.arn in thing_group.things:
# aws ignores duplicate registration
return
thing_group.things[thing.arn] = thing
def remove_thing_from_thing_group(self, thing_group_name, thing_group_arn, thing_name, thing_arn):
thing_group = self._identify_thing_group(thing_group_name, thing_group_arn)
thing = self._identify_thing(thing_name, thing_arn)
if thing.arn not in thing_group.things:
# aws ignores non-registered thing
return
del thing_group.things[thing.arn]
def list_things_in_thing_group(self, thing_group_name, recursive):
thing_group = self.describe_thing_group(thing_group_name)
return thing_group.things.values()
def list_thing_groups_for_thing(self, thing_name):
thing = self.describe_thing(thing_name)
all_thing_groups = self.list_thing_groups(None, None, None)
ret = []
for thing_group in all_thing_groups:
if thing.arn in thing_group.things:
ret.append({
'groupName': thing_group.thing_group_name,
'groupArn': thing_group.arn
})
return ret
def update_thing_groups_for_thing(self, thing_name, thing_groups_to_add, thing_groups_to_remove):
thing = self.describe_thing(thing_name)
for thing_group_name in thing_groups_to_add:
thing_group = self.describe_thing_group(thing_group_name)
self.add_thing_to_thing_group(
thing_group.thing_group_name, None,
thing.thing_name, None
)
for thing_group_name in thing_groups_to_remove:
thing_group = self.describe_thing_group(thing_group_name)
self.remove_thing_from_thing_group(
thing_group.thing_group_name, None,
thing.thing_name, None
)
available_regions = boto3.session.Session().get_available_regions("iot") available_regions = boto3.session.Session().get_available_regions("iot")
iot_backends = {region: IoTBackend(region) for region in available_regions} iot_backends = {region: IoTBackend(region) for region in available_regions}

View File

@ -38,8 +38,7 @@ class IoTResponse(BaseResponse):
thing_types = self.iot_backend.list_thing_types( thing_types = self.iot_backend.list_thing_types(
thing_type_name=thing_type_name thing_type_name=thing_type_name
) )
# TODO: implement pagination in the future
# TODO: support next_token and max_results
next_token = None next_token = None
return json.dumps(dict(thingTypes=[_.to_dict() for _ in thing_types], nextToken=next_token)) return json.dumps(dict(thingTypes=[_.to_dict() for _ in thing_types], nextToken=next_token))
@ -54,7 +53,7 @@ class IoTResponse(BaseResponse):
attribute_value=attribute_value, attribute_value=attribute_value,
thing_type_name=thing_type_name, thing_type_name=thing_type_name,
) )
# TODO: support next_token and max_results # TODO: implement pagination in the future
next_token = None next_token = None
return json.dumps(dict(things=[_.to_dict() for _ in things], nextToken=next_token)) return json.dumps(dict(things=[_.to_dict() for _ in things], nextToken=next_token))
@ -63,7 +62,6 @@ class IoTResponse(BaseResponse):
thing = self.iot_backend.describe_thing( thing = self.iot_backend.describe_thing(
thing_name=thing_name, thing_name=thing_name,
) )
print(thing.to_dict(include_default_client_id=True))
return json.dumps(thing.to_dict(include_default_client_id=True)) return json.dumps(thing.to_dict(include_default_client_id=True))
def describe_thing_type(self): def describe_thing_type(self):
@ -135,7 +133,7 @@ class IoTResponse(BaseResponse):
# marker = self._get_param("marker") # marker = self._get_param("marker")
# ascending_order = self._get_param("ascendingOrder") # ascending_order = self._get_param("ascendingOrder")
certificates = self.iot_backend.list_certificates() certificates = self.iot_backend.list_certificates()
# TODO: handle pagination # TODO: implement pagination in the future
return json.dumps(dict(certificates=[_.to_dict() for _ in certificates])) return json.dumps(dict(certificates=[_.to_dict() for _ in certificates]))
def update_certificate(self): def update_certificate(self):
@ -162,7 +160,7 @@ class IoTResponse(BaseResponse):
# ascending_order = self._get_param("ascendingOrder") # ascending_order = self._get_param("ascendingOrder")
policies = self.iot_backend.list_policies() policies = self.iot_backend.list_policies()
# TODO: handle pagination # TODO: implement pagination in the future
return json.dumps(dict(policies=[_.to_dict() for _ in policies])) return json.dumps(dict(policies=[_.to_dict() for _ in policies]))
def get_policy(self): def get_policy(self):
@ -205,7 +203,7 @@ class IoTResponse(BaseResponse):
policies = self.iot_backend.list_principal_policies( policies = self.iot_backend.list_principal_policies(
principal_arn=principal principal_arn=principal
) )
# TODO: handle pagination # TODO: implement pagination in the future
next_marker = None next_marker = None
return json.dumps(dict(policies=[_.to_dict() for _ in policies], nextMarker=next_marker)) return json.dumps(dict(policies=[_.to_dict() for _ in policies], nextMarker=next_marker))
@ -217,7 +215,7 @@ class IoTResponse(BaseResponse):
principals = self.iot_backend.list_policy_principals( principals = self.iot_backend.list_policy_principals(
policy_name=policy_name, policy_name=policy_name,
) )
# TODO: handle pagination # TODO: implement pagination in the future
next_marker = None next_marker = None
return json.dumps(dict(principals=principals, nextMarker=next_marker)) return json.dumps(dict(principals=principals, nextMarker=next_marker))
@ -246,7 +244,7 @@ class IoTResponse(BaseResponse):
things = self.iot_backend.list_principal_things( things = self.iot_backend.list_principal_things(
principal_arn=principal, principal_arn=principal,
) )
# TODO: handle pagination # TODO: implement pagination in the future
next_token = None next_token = None
return json.dumps(dict(things=things, nextToken=next_token)) return json.dumps(dict(things=things, nextToken=next_token))
@ -256,3 +254,123 @@ class IoTResponse(BaseResponse):
thing_name=thing_name, thing_name=thing_name,
) )
return json.dumps(dict(principals=principals)) return json.dumps(dict(principals=principals))
def describe_thing_group(self):
thing_group_name = self._get_param("thingGroupName")
thing_group = self.iot_backend.describe_thing_group(
thing_group_name=thing_group_name,
)
return json.dumps(thing_group.to_dict())
def create_thing_group(self):
thing_group_name = self._get_param("thingGroupName")
parent_group_name = self._get_param("parentGroupName")
thing_group_properties = self._get_param("thingGroupProperties")
thing_group_name, thing_group_arn, thing_group_id = self.iot_backend.create_thing_group(
thing_group_name=thing_group_name,
parent_group_name=parent_group_name,
thing_group_properties=thing_group_properties,
)
return json.dumps(dict(
thingGroupName=thing_group_name,
thingGroupArn=thing_group_arn,
thingGroupId=thing_group_id)
)
def delete_thing_group(self):
thing_group_name = self._get_param("thingGroupName")
expected_version = self._get_param("expectedVersion")
self.iot_backend.delete_thing_group(
thing_group_name=thing_group_name,
expected_version=expected_version,
)
return json.dumps(dict())
def list_thing_groups(self):
# next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
parent_group = self._get_param("parentGroup")
name_prefix_filter = self._get_param("namePrefixFilter")
recursive = self._get_param("recursive")
thing_groups = self.iot_backend.list_thing_groups(
parent_group=parent_group,
name_prefix_filter=name_prefix_filter,
recursive=recursive,
)
next_token = None
rets = [{'groupName': _.thing_group_name, 'groupArn': _.arn} for _ in thing_groups]
# TODO: implement pagination in the future
return json.dumps(dict(thingGroups=rets, nextToken=next_token))
def update_thing_group(self):
thing_group_name = self._get_param("thingGroupName")
thing_group_properties = self._get_param("thingGroupProperties")
expected_version = self._get_param("expectedVersion")
version = self.iot_backend.update_thing_group(
thing_group_name=thing_group_name,
thing_group_properties=thing_group_properties,
expected_version=expected_version,
)
return json.dumps(dict(version=version))
def add_thing_to_thing_group(self):
thing_group_name = self._get_param("thingGroupName")
thing_group_arn = self._get_param("thingGroupArn")
thing_name = self._get_param("thingName")
thing_arn = self._get_param("thingArn")
self.iot_backend.add_thing_to_thing_group(
thing_group_name=thing_group_name,
thing_group_arn=thing_group_arn,
thing_name=thing_name,
thing_arn=thing_arn,
)
return json.dumps(dict())
def remove_thing_from_thing_group(self):
thing_group_name = self._get_param("thingGroupName")
thing_group_arn = self._get_param("thingGroupArn")
thing_name = self._get_param("thingName")
thing_arn = self._get_param("thingArn")
self.iot_backend.remove_thing_from_thing_group(
thing_group_name=thing_group_name,
thing_group_arn=thing_group_arn,
thing_name=thing_name,
thing_arn=thing_arn,
)
return json.dumps(dict())
def list_things_in_thing_group(self):
thing_group_name = self._get_param("thingGroupName")
recursive = self._get_param("recursive")
# next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
things = self.iot_backend.list_things_in_thing_group(
thing_group_name=thing_group_name,
recursive=recursive,
)
next_token = None
thing_names = [_.thing_name for _ in things]
# TODO: implement pagination in the future
return json.dumps(dict(things=thing_names, nextToken=next_token))
def list_thing_groups_for_thing(self):
thing_name = self._get_param("thingName")
# next_token = self._get_param("nextToken")
# max_results = self._get_int_param("maxResults")
thing_groups = self.iot_backend.list_thing_groups_for_thing(
thing_name=thing_name
)
next_token = None
# TODO: implement pagination in the future
return json.dumps(dict(thingGroups=thing_groups, nextToken=next_token))
def update_thing_groups_for_thing(self):
thing_name = self._get_param("thingName")
thing_groups_to_add = self._get_param("thingGroupsToAdd") or []
thing_groups_to_remove = self._get_param("thingGroupsToRemove") or []
self.iot_backend.update_thing_groups_for_thing(
thing_name=thing_name,
thing_groups_to_add=thing_groups_to_add,
thing_groups_to_remove=thing_groups_to_remove,
)
return json.dumps(dict())

View File

@ -111,3 +111,30 @@ class MalformedXML(S3ClientError):
"MalformedXML", "MalformedXML",
"The XML you provided was not well-formed or did not validate against our published schema", "The XML you provided was not well-formed or did not validate against our published schema",
*args, **kwargs) *args, **kwargs)
class MalformedACLError(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(MalformedACLError, self).__init__(
"MalformedACLError",
"The XML you provided was not well-formed or did not validate against our published schema",
*args, **kwargs)
class InvalidTargetBucketForLogging(S3ClientError):
code = 400
def __init__(self, msg):
super(InvalidTargetBucketForLogging, self).__init__("InvalidTargetBucketForLogging", msg)
class CrossLocationLoggingProhibitted(S3ClientError):
code = 403
def __init__(self):
super(CrossLocationLoggingProhibitted, self).__init__(
"CrossLocationLoggingProhibitted",
"Cross S3 location logging not allowed."
)

View File

@ -347,6 +347,7 @@ class FakeBucket(BaseModel):
self.acl = get_canned_acl('private') self.acl = get_canned_acl('private')
self.tags = FakeTagging() self.tags = FakeTagging()
self.cors = [] self.cors = []
self.logging = {}
@property @property
def location(self): def location(self):
@ -422,6 +423,40 @@ class FakeBucket(BaseModel):
def tagging(self): def tagging(self):
return self.tags return self.tags
def set_logging(self, logging_config, bucket_backend):
if not logging_config:
self.logging = {}
else:
from moto.s3.exceptions import InvalidTargetBucketForLogging, CrossLocationLoggingProhibitted
# Target bucket must exist in the same account (assuming all moto buckets are in the same account):
if not bucket_backend.buckets.get(logging_config["TargetBucket"]):
raise InvalidTargetBucketForLogging("The target bucket for logging does not exist.")
# Does the target bucket have the log-delivery WRITE and READ_ACP permissions?
write = read_acp = False
for grant in bucket_backend.buckets[logging_config["TargetBucket"]].acl.grants:
# Must be granted to: http://acs.amazonaws.com/groups/s3/LogDelivery
for grantee in grant.grantees:
if grantee.uri == "http://acs.amazonaws.com/groups/s3/LogDelivery":
if "WRITE" in grant.permissions or "FULL_CONTROL" in grant.permissions:
write = True
if "READ_ACP" in grant.permissions or "FULL_CONTROL" in grant.permissions:
read_acp = True
break
if not write or not read_acp:
raise InvalidTargetBucketForLogging("You must give the log-delivery group WRITE and READ_ACP"
" permissions to the target bucket")
# Buckets must also exist within the same region:
if bucket_backend.buckets[logging_config["TargetBucket"]].region_name != self.region_name:
raise CrossLocationLoggingProhibitted()
# Checks pass -- set the logging config:
self.logging = logging_config
def set_website_configuration(self, website_configuration): def set_website_configuration(self, website_configuration):
self.website_configuration = website_configuration self.website_configuration = website_configuration
@ -608,6 +643,10 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
bucket.set_cors(cors_rules) bucket.set_cors(cors_rules)
def put_bucket_logging(self, bucket_name, logging_config):
bucket = self.get_bucket(bucket_name)
bucket.set_logging(logging_config, self)
def delete_bucket_cors(self, bucket_name): def delete_bucket_cors(self, bucket_name):
bucket = self.get_bucket(bucket_name) bucket = self.get_bucket(bucket_name)
bucket.delete_cors() bucket.delete_cors()

View File

@ -11,11 +11,13 @@ import xmltodict
from moto.packages.httpretty.core import HTTPrettyRequest from moto.packages.httpretty.core import HTTPrettyRequest
from moto.core.responses import _TemplateEnvironmentMixin from moto.core.responses import _TemplateEnvironmentMixin
from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, \
parse_key_name as bucketpath_parse_key_name, is_delete_keys as bucketpath_is_delete_keys
from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, MissingKey, InvalidPartOrder, MalformedXML, \
from .exceptions import BucketAlreadyExists, S3ClientError, MissingBucket, MissingKey, InvalidPartOrder MalformedACLError
from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, FakeTag from .models import s3_backend, get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey, FakeTagging, FakeTagSet, \
FakeTag
from .utils import bucket_name_from_url, metadata_from_headers from .utils import bucket_name_from_url, metadata_from_headers
from xml.dom import minidom from xml.dom import minidom
@ -70,8 +72,9 @@ class ResponseObject(_TemplateEnvironmentMixin):
match = re.match(r'^\[(.+)\](:\d+)?$', host) match = re.match(r'^\[(.+)\](:\d+)?$', host)
if match: if match:
match = re.match(r'^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z', match = re.match(
match.groups()[0], re.IGNORECASE) r'^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z',
match.groups()[0], re.IGNORECASE)
if match: if match:
return False return False
@ -229,6 +232,13 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 404, {}, template.render(bucket_name=bucket_name) return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_TAGGING_RESPONSE) template = self.response_template(S3_BUCKET_TAGGING_RESPONSE)
return template.render(bucket=bucket) return template.render(bucket=bucket)
elif 'logging' in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.logging:
template = self.response_template(S3_NO_LOGGING_CONFIG)
return 200, {}, template.render()
template = self.response_template(S3_LOGGING_CONFIG)
return 200, {}, template.render(logging=bucket.logging)
elif "cors" in querystring: elif "cors" in querystring:
bucket = self.backend.get_bucket(bucket_name) bucket = self.backend.get_bucket(bucket_name)
if len(bucket.cors) == 0: if len(bucket.cors) == 0:
@ -324,8 +334,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
limit = continuation_token or start_after limit = continuation_token or start_after
result_keys = self._get_results_from_token(result_keys, limit) result_keys = self._get_results_from_token(result_keys, limit)
result_keys, is_truncated, \ result_keys, is_truncated, next_continuation_token = self._truncate_result(result_keys, max_keys)
next_continuation_token = self._truncate_result(result_keys, max_keys)
return template.render( return template.render(
bucket=bucket, bucket=bucket,
@ -380,8 +389,11 @@ class ResponseObject(_TemplateEnvironmentMixin):
self.backend.set_bucket_policy(bucket_name, body) self.backend.set_bucket_policy(bucket_name, body)
return 'True' return 'True'
elif 'acl' in querystring: elif 'acl' in querystring:
# TODO: Support the XML-based ACL format # Headers are first. If not set, then look at the body (consistent with the documentation):
self.backend.set_bucket_acl(bucket_name, self._acl_from_headers(request.headers)) acls = self._acl_from_headers(request.headers)
if not acls:
acls = self._acl_from_xml(body)
self.backend.set_bucket_acl(bucket_name, acls)
return "" return ""
elif "tagging" in querystring: elif "tagging" in querystring:
tagging = self._bucket_tagging_from_xml(body) tagging = self._bucket_tagging_from_xml(body)
@ -391,12 +403,18 @@ class ResponseObject(_TemplateEnvironmentMixin):
self.backend.set_bucket_website_configuration(bucket_name, body) self.backend.set_bucket_website_configuration(bucket_name, body)
return "" return ""
elif "cors" in querystring: elif "cors" in querystring:
from moto.s3.exceptions import MalformedXML
try: try:
self.backend.put_bucket_cors(bucket_name, self._cors_from_xml(body)) self.backend.put_bucket_cors(bucket_name, self._cors_from_xml(body))
return "" return ""
except KeyError: except KeyError:
raise MalformedXML() raise MalformedXML()
elif "logging" in querystring:
try:
self.backend.put_bucket_logging(bucket_name, self._logging_from_xml(body))
return ""
except KeyError:
raise MalformedXML()
else: else:
if body: if body:
try: try:
@ -515,6 +533,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
def toint(i): def toint(i):
return int(i) if i else None return int(i) if i else None
begin, end = map(toint, rspec.split('-')) begin, end = map(toint, rspec.split('-'))
if begin is not None: # byte range if begin is not None: # byte range
end = last if end is None else min(end, last) end = last if end is None else min(end, last)
@ -731,6 +750,58 @@ class ResponseObject(_TemplateEnvironmentMixin):
else: else:
return 404, response_headers, "" return 404, response_headers, ""
def _acl_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
if not parsed_xml.get("AccessControlPolicy"):
raise MalformedACLError()
# The owner is needed for some reason...
if not parsed_xml["AccessControlPolicy"].get("Owner"):
# TODO: Validate that the Owner is actually correct.
raise MalformedACLError()
# If empty, then no ACLs:
if parsed_xml["AccessControlPolicy"].get("AccessControlList") is None:
return []
if not parsed_xml["AccessControlPolicy"]["AccessControlList"].get("Grant"):
raise MalformedACLError()
permissions = [
"READ",
"WRITE",
"READ_ACP",
"WRITE_ACP",
"FULL_CONTROL"
]
if not isinstance(parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"], list):
parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"] = \
[parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"]]
grants = self._get_grants_from_xml(parsed_xml["AccessControlPolicy"]["AccessControlList"]["Grant"],
MalformedACLError, permissions)
return FakeAcl(grants)
def _get_grants_from_xml(self, grant_list, exception_type, permissions):
grants = []
for grant in grant_list:
if grant.get("Permission", "") not in permissions:
raise exception_type()
if grant["Grantee"].get("@xsi:type", "") not in ["CanonicalUser", "AmazonCustomerByEmail", "Group"]:
raise exception_type()
# TODO: Verify that the proper grantee data is supplied based on the type.
grants.append(FakeGrant(
[FakeGrantee(id=grant["Grantee"].get("ID", ""), display_name=grant["Grantee"].get("DisplayName", ""),
uri=grant["Grantee"].get("URI", ""))],
[grant["Permission"]])
)
return grants
def _acl_from_headers(self, headers): def _acl_from_headers(self, headers):
canned_acl = headers.get('x-amz-acl', '') canned_acl = headers.get('x-amz-acl', '')
if canned_acl: if canned_acl:
@ -814,6 +885,42 @@ class ResponseObject(_TemplateEnvironmentMixin):
return [parsed_xml["CORSConfiguration"]["CORSRule"]] return [parsed_xml["CORSConfiguration"]["CORSRule"]]
def _logging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
if not parsed_xml["BucketLoggingStatus"].get("LoggingEnabled"):
return {}
if not parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetBucket"):
raise MalformedXML()
if not parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetPrefix"):
parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetPrefix"] = ""
# Get the ACLs:
if parsed_xml["BucketLoggingStatus"]["LoggingEnabled"].get("TargetGrants"):
permissions = [
"READ",
"WRITE",
"FULL_CONTROL"
]
if not isinstance(parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"], list):
target_grants = self._get_grants_from_xml(
[parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"]],
MalformedXML,
permissions
)
else:
target_grants = self._get_grants_from_xml(
parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"]["Grant"],
MalformedXML,
permissions
)
parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]["TargetGrants"] = target_grants
return parsed_xml["BucketLoggingStatus"]["LoggingEnabled"]
def _key_response_delete(self, bucket_name, query, key_name, headers): def _key_response_delete(self, bucket_name, query, key_name, headers):
if query.get('uploadId'): if query.get('uploadId'):
upload_id = query['uploadId'][0] upload_id = query['uploadId'][0]
@ -1322,3 +1429,37 @@ S3_NO_CORS_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId> <HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error> </Error>
""" """
S3_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<BucketLoggingStatus xmlns="http://doc.s3.amazonaws.com/2006-03-01">
<LoggingEnabled>
<TargetBucket>{{ logging["TargetBucket"] }}</TargetBucket>
<TargetPrefix>{{ logging["TargetPrefix"] }}</TargetPrefix>
{% if logging.get("TargetGrants") %}
<TargetGrants>
{% for grant in logging["TargetGrants"] %}
<Grant>
<Grantee xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:type="{{ grant.grantees[0].type }}">
{% if grant.grantees[0].uri %}
<URI>{{ grant.grantees[0].uri }}</URI>
{% endif %}
{% if grant.grantees[0].id %}
<ID>{{ grant.grantees[0].id }}</ID>
{% endif %}
{% if grant.grantees[0].display_name %}
<DisplayName>{{ grant.grantees[0].display_name }}</DisplayName>
{% endif %}
</Grantee>
<Permission>{{ grant.permissions[0] }}</Permission>
</Grant>
{% endfor %}
</TargetGrants>
{% endif %}
</LoggingEnabled>
</BucketLoggingStatus>
"""
S3_NO_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<BucketLoggingStatus xmlns="http://doc.s3.amazonaws.com/2006-03-01" />
"""

View File

@ -51,7 +51,7 @@ def mock_xray_client(f):
aws_xray_sdk.core.xray_recorder._emitter = MockEmitter() aws_xray_sdk.core.xray_recorder._emitter = MockEmitter()
try: try:
f(*args, **kwargs) return f(*args, **kwargs)
finally: finally:
if old_xray_context_var is None: if old_xray_context_var is None:

27
scripts/bump_version Executable file
View File

@ -0,0 +1,27 @@
#!/bin/bash
main() {
set -euo pipefail # Bash safemode
local version=$1
if [[ -z "${version}" ]]; then
echo "USAGE: $0 1.3.2"
echo "Provide a new version number as an argument to bump the version"
echo -n "Current:"
grep version= setup.py
return 1
fi
&>/dev/null which bumpversion || pip install bumpversion
bumpversion --new-version ${version} patch
git checkout -b version-${version}
# Commit the new version
git commit -a -m "bumping to version ${version}"
# Commit an updated IMPLEMENTATION_COVERAGE.md
make implementation_coverage || true
# Open a PR
open https://github.com/spulec/moto/compare/master...version-${version}
}
main $@

View File

@ -39,7 +39,7 @@ else:
setup( setup(
name='moto', name='moto',
version='1.1.25', version='1.2.0',
description='A library that allows your python tests to easily' description='A library that allows your python tests to easily'
' mock out the boto library', ' mock out the boto library',
author='Steve Pulec', author='Steve Pulec',

View File

@ -752,6 +752,9 @@ def test_vpc_single_instance_in_subnet():
security_group.vpc_id.should.equal(vpc.id) security_group.vpc_id.should.equal(vpc.id)
stack = conn.describe_stacks()[0] stack = conn.describe_stacks()[0]
vpc.tags.should.have.key('Application').which.should.equal(stack.stack_id)
resources = stack.describe_resources() resources = stack.describe_resources()
vpc_resource = [ vpc_resource = [
resource for resource in resources if resource.resource_type == 'AWS::EC2::VPC'][0] resource for resource in resources if resource.resource_type == 'AWS::EC2::VPC'][0]

View File

@ -705,3 +705,17 @@ def test_ami_filter_by_owner_id():
assert all(ubuntu_ids) and ubuntu_ids[0] == ubuntu_id assert all(ubuntu_ids) and ubuntu_ids[0] == ubuntu_id
# Check we actually have a subset of images # Check we actually have a subset of images
assert len(ubuntu_ids) < len(all_ids) assert len(ubuntu_ids) < len(all_ids)
@mock_ec2
def test_ami_filter_by_self():
client = boto3.client('ec2', region_name='us-east-1')
my_images = client.describe_images(Owners=['self'])
assert len(my_images) == 0
# Create a new image
instance = ec2.create_instances(ImageId='ami-1234abcd', MinCount=1, MaxCount=1)[0]
image = instance.create_image(Name='test-image')
my_images = client.describe_images(Owners=['self'])
assert len(my_images) == 1

View File

@ -36,6 +36,11 @@ def test_boto3_describe_regions():
for rec in resp['Regions']: for rec in resp['Regions']:
rec['Endpoint'].should.contain(rec['RegionName']) rec['Endpoint'].should.contain(rec['RegionName'])
test_region = 'us-east-1'
resp = ec2.describe_regions(RegionNames=[test_region])
resp['Regions'].should.have.length_of(1)
resp['Regions'][0].should.have.key('RegionName').which.should.equal(test_region)
@mock_ec2 @mock_ec2
def test_boto3_availability_zones(): def test_boto3_availability_zones():

View File

@ -9,7 +9,7 @@ import re
import sure # noqa import sure # noqa
import boto3 import boto3
from botocore.exceptions import ClientError from botocore.exceptions import ClientError, ParamValidationError
from dateutil.tz import tzlocal from dateutil.tz import tzlocal
from moto import mock_ecr from moto import mock_ecr
@ -445,3 +445,117 @@ def test_get_authorization_token_explicit_regions():
} }
]) ])
@mock_ecr
def test_batch_get_image():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='latest'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v1'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v2'
)
response = client.batch_get_image(
repositoryName='test_repository',
imageIds=[
{
'imageTag': 'v2'
},
],
)
type(response['images']).should.be(list)
len(response['images']).should.be(1)
response['images'][0]['imageManifest'].should.contain("vnd.docker.distribution.manifest.v2+json")
response['images'][0]['registryId'].should.equal("012345678910")
response['images'][0]['repositoryName'].should.equal("test_repository")
response['images'][0]['imageId']['imageTag'].should.equal("v2")
response['images'][0]['imageId']['imageDigest'].should.contain("sha")
type(response['failures']).should.be(list)
len(response['failures']).should.be(0)
@mock_ecr
def test_batch_get_image_that_doesnt_exist():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='latest'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v1'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='v2'
)
response = client.batch_get_image(
repositoryName='test_repository',
imageIds=[
{
'imageTag': 'v5'
},
],
)
type(response['images']).should.be(list)
len(response['images']).should.be(0)
type(response['failures']).should.be(list)
len(response['failures']).should.be(1)
response['failures'][0]['failureReason'].should.equal("Requested image not found")
response['failures'][0]['failureCode'].should.equal("ImageNotFound")
response['failures'][0]['imageId']['imageTag'].should.equal("v5")
@mock_ecr
def test_batch_get_image_no_tags():
client = boto3.client('ecr', region_name='us-east-1')
_ = client.create_repository(
repositoryName='test_repository'
)
_ = client.put_image(
repositoryName='test_repository',
imageManifest=json.dumps(_create_image_manifest()),
imageTag='latest'
)
error_msg = re.compile(
r".*Missing required parameter in input: \"imageIds\".*",
re.MULTILINE)
client.batch_get_image.when.called_with(
repositoryName='test_repository').should.throw(
ParamValidationError, error_msg)

View File

@ -651,3 +651,21 @@ def test_attach_detach_user_policy():
resp = client.list_attached_user_policies(UserName=user.name) resp = client.list_attached_user_policies(UserName=user.name)
resp['AttachedPolicies'].should.have.length_of(0) resp['AttachedPolicies'].should.have.length_of(0)
@mock_iam
def test_update_access_key():
iam = boto3.resource('iam', region_name='us-east-1')
client = iam.meta.client
username = 'test-user'
iam.create_user(UserName=username)
with assert_raises(ClientError):
client.update_access_key(UserName=username,
AccessKeyId='non-existent-key',
Status='Inactive')
key = client.create_access_key(UserName=username)['AccessKey']
client.update_access_key(UserName=username,
AccessKeyId=key['AccessKeyId'],
Status='Inactive')
resp = client.list_access_keys(UserName=username)
resp['AccessKeyMetadata'][0]['Status'].should.equal('Inactive')

View File

@ -177,3 +177,192 @@ def test_principal_thing():
res.should.have.key('things').which.should.have.length_of(0) res.should.have.key('things').which.should.have.length_of(0)
res = client.list_thing_principals(thingName=thing_name) res = client.list_thing_principals(thingName=thing_name)
res.should.have.key('principals').which.should.have.length_of(0) res.should.have.key('principals').which.should.have.length_of(0)
@mock_iot
def test_thing_groups():
client = boto3.client('iot', region_name='ap-northeast-1')
name = 'my-thing'
group_name = 'my-group-name'
# thing group
thing_group = client.create_thing_group(thingGroupName=group_name)
thing_group.should.have.key('thingGroupName').which.should.equal(group_name)
thing_group.should.have.key('thingGroupArn')
res = client.list_thing_groups()
res.should.have.key('thingGroups').which.should.have.length_of(1)
for thing_group in res['thingGroups']:
thing_group.should.have.key('groupName').which.should_not.be.none
thing_group.should.have.key('groupArn').which.should_not.be.none
thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key('thingGroupName').which.should.equal(group_name)
thing_group.should.have.key('thingGroupProperties')
thing_group.should.have.key('thingGroupMetadata')
thing_group.should.have.key('version')
# delete thing group
client.delete_thing_group(thingGroupName=group_name)
res = client.list_thing_groups()
res.should.have.key('thingGroups').which.should.have.length_of(0)
# props create test
props = {
'thingGroupDescription': 'my first thing group',
'attributePayload': {
'attributes': {
'key1': 'val01',
'Key02': 'VAL2'
}
}
}
thing_group = client.create_thing_group(thingGroupName=group_name, thingGroupProperties=props)
thing_group.should.have.key('thingGroupName').which.should.equal(group_name)
thing_group.should.have.key('thingGroupArn')
thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key('thingGroupProperties')\
.which.should.have.key('attributePayload')\
.which.should.have.key('attributes')
res_props = thing_group['thingGroupProperties']['attributePayload']['attributes']
res_props.should.have.key('key1').which.should.equal('val01')
res_props.should.have.key('Key02').which.should.equal('VAL2')
# props update test with merge
new_props = {
'attributePayload': {
'attributes': {
'k3': 'v3'
},
'merge': True
}
}
client.update_thing_group(
thingGroupName=group_name,
thingGroupProperties=new_props
)
thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key('thingGroupProperties')\
.which.should.have.key('attributePayload')\
.which.should.have.key('attributes')
res_props = thing_group['thingGroupProperties']['attributePayload']['attributes']
res_props.should.have.key('key1').which.should.equal('val01')
res_props.should.have.key('Key02').which.should.equal('VAL2')
res_props.should.have.key('k3').which.should.equal('v3')
# props update test
new_props = {
'attributePayload': {
'attributes': {
'k4': 'v4'
}
}
}
client.update_thing_group(
thingGroupName=group_name,
thingGroupProperties=new_props
)
thing_group = client.describe_thing_group(thingGroupName=group_name)
thing_group.should.have.key('thingGroupProperties')\
.which.should.have.key('attributePayload')\
.which.should.have.key('attributes')
res_props = thing_group['thingGroupProperties']['attributePayload']['attributes']
res_props.should.have.key('k4').which.should.equal('v4')
res_props.should_not.have.key('key1')
@mock_iot
def test_thing_group_relations():
client = boto3.client('iot', region_name='ap-northeast-1')
name = 'my-thing'
group_name = 'my-group-name'
# thing group
thing_group = client.create_thing_group(thingGroupName=group_name)
thing_group.should.have.key('thingGroupName').which.should.equal(group_name)
thing_group.should.have.key('thingGroupArn')
# thing
thing = client.create_thing(thingName=name)
thing.should.have.key('thingName').which.should.equal(name)
thing.should.have.key('thingArn')
# add in 4 way
client.add_thing_to_thing_group(
thingGroupName=group_name,
thingName=name
)
client.add_thing_to_thing_group(
thingGroupArn=thing_group['thingGroupArn'],
thingArn=thing['thingArn']
)
client.add_thing_to_thing_group(
thingGroupName=group_name,
thingArn=thing['thingArn']
)
client.add_thing_to_thing_group(
thingGroupArn=thing_group['thingGroupArn'],
thingName=name
)
things = client.list_things_in_thing_group(
thingGroupName=group_name
)
things.should.have.key('things')
things['things'].should.have.length_of(1)
thing_groups = client.list_thing_groups_for_thing(
thingName=name
)
thing_groups.should.have.key('thingGroups')
thing_groups['thingGroups'].should.have.length_of(1)
# remove in 4 way
client.remove_thing_from_thing_group(
thingGroupName=group_name,
thingName=name
)
client.remove_thing_from_thing_group(
thingGroupArn=thing_group['thingGroupArn'],
thingArn=thing['thingArn']
)
client.remove_thing_from_thing_group(
thingGroupName=group_name,
thingArn=thing['thingArn']
)
client.remove_thing_from_thing_group(
thingGroupArn=thing_group['thingGroupArn'],
thingName=name
)
things = client.list_things_in_thing_group(
thingGroupName=group_name
)
things.should.have.key('things')
things['things'].should.have.length_of(0)
# update thing group for thing
client.update_thing_groups_for_thing(
thingName=name,
thingGroupsToAdd=[
group_name
]
)
things = client.list_things_in_thing_group(
thingGroupName=group_name
)
things.should.have.key('things')
things['things'].should.have.length_of(1)
client.update_thing_groups_for_thing(
thingName=name,
thingGroupsToRemove=[
group_name
]
)
things = client.list_things_in_thing_group(
thingGroupName=group_name
)
things.should.have.key('things')
things['things'].should.have.length_of(0)

View File

@ -50,6 +50,7 @@ def reduced_min_part_size(f):
return f(*args, **kwargs) return f(*args, **kwargs)
finally: finally:
s3model.UPLOAD_PART_MIN_SIZE = orig_size s3model.UPLOAD_PART_MIN_SIZE = orig_size
return wrapped return wrapped
@ -883,11 +884,12 @@ def test_s3_object_in_public_bucket():
s3_anonymous.Object(key='file.txt', bucket_name='test-bucket').get() s3_anonymous.Object(key='file.txt', bucket_name='test-bucket').get()
exc.exception.response['Error']['Code'].should.equal('403') exc.exception.response['Error']['Code'].should.equal('403')
params = {'Bucket': 'test-bucket','Key': 'file.txt'} params = {'Bucket': 'test-bucket', 'Key': 'file.txt'}
presigned_url = boto3.client('s3').generate_presigned_url('get_object', params, ExpiresIn=900) presigned_url = boto3.client('s3').generate_presigned_url('get_object', params, ExpiresIn=900)
response = requests.get(presigned_url) response = requests.get(presigned_url)
assert response.status_code == 200 assert response.status_code == 200
@mock_s3 @mock_s3
def test_s3_object_in_private_bucket(): def test_s3_object_in_private_bucket():
s3 = boto3.resource('s3') s3 = boto3.resource('s3')
@ -1102,6 +1104,7 @@ def test_boto3_key_etag():
resp = s3.get_object(Bucket='mybucket', Key='steve') resp = s3.get_object(Bucket='mybucket', Key='steve')
resp['ETag'].should.equal('"d32bda93738f7e03adb22e66c90fbc04"') resp['ETag'].should.equal('"d32bda93738f7e03adb22e66c90fbc04"')
@mock_s3 @mock_s3
def test_website_redirect_location(): def test_website_redirect_location():
s3 = boto3.client('s3', region_name='us-east-1') s3 = boto3.client('s3', region_name='us-east-1')
@ -1116,6 +1119,7 @@ def test_website_redirect_location():
resp = s3.get_object(Bucket='mybucket', Key='steve') resp = s3.get_object(Bucket='mybucket', Key='steve')
resp['WebsiteRedirectLocation'].should.equal(url) resp['WebsiteRedirectLocation'].should.equal(url)
@mock_s3 @mock_s3
def test_boto3_list_keys_xml_escaped(): def test_boto3_list_keys_xml_escaped():
s3 = boto3.client('s3', region_name='us-east-1') s3 = boto3.client('s3', region_name='us-east-1')
@ -1627,7 +1631,7 @@ def test_boto3_put_bucket_cors():
}) })
e = err.exception e = err.exception
e.response["Error"]["Code"].should.equal("InvalidRequest") e.response["Error"]["Code"].should.equal("InvalidRequest")
e.response["Error"]["Message"].should.equal("Found unsupported HTTP method in CORS config. " e.response["Error"]["Message"].should.equal("Found unsupported HTTP method in CORS config. "
"Unsupported method is NOTREAL") "Unsupported method is NOTREAL")
with assert_raises(ClientError) as err: with assert_raises(ClientError) as err:
@ -1732,6 +1736,249 @@ def test_boto3_delete_bucket_cors():
e.response["Error"]["Message"].should.equal("The CORS configuration does not exist") e.response["Error"]["Message"].should.equal("The CORS configuration does not exist")
@mock_s3
def test_put_bucket_acl_body():
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="bucket")
bucket_owner = s3.get_bucket_acl(Bucket="bucket")["Owner"]
s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group"
},
"Permission": "WRITE"
},
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group"
},
"Permission": "READ_ACP"
}
],
"Owner": bucket_owner
})
result = s3.get_bucket_acl(Bucket="bucket")
assert len(result["Grants"]) == 2
for g in result["Grants"]:
assert g["Grantee"]["URI"] == "http://acs.amazonaws.com/groups/s3/LogDelivery"
assert g["Grantee"]["Type"] == "Group"
assert g["Permission"] in ["WRITE", "READ_ACP"]
# With one:
s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group"
},
"Permission": "WRITE"
}
],
"Owner": bucket_owner
})
result = s3.get_bucket_acl(Bucket="bucket")
assert len(result["Grants"]) == 1
# With no owner:
with assert_raises(ClientError) as err:
s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group"
},
"Permission": "WRITE"
}
]
})
assert err.exception.response["Error"]["Code"] == "MalformedACLError"
# With incorrect permission:
with assert_raises(ClientError) as err:
s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group"
},
"Permission": "lskjflkasdjflkdsjfalisdjflkdsjf"
}
],
"Owner": bucket_owner
})
assert err.exception.response["Error"]["Code"] == "MalformedACLError"
# Clear the ACLs:
result = s3.put_bucket_acl(Bucket="bucket", AccessControlPolicy={"Grants": [], "Owner": bucket_owner})
assert not result.get("Grants")
@mock_s3
def test_boto3_put_bucket_logging():
s3 = boto3.client("s3", region_name="us-east-1")
bucket_name = "mybucket"
log_bucket = "logbucket"
wrong_region_bucket = "wrongregionlogbucket"
s3.create_bucket(Bucket=bucket_name)
s3.create_bucket(Bucket=log_bucket) # Adding the ACL for log-delivery later...
s3.create_bucket(Bucket=wrong_region_bucket, CreateBucketConfiguration={"LocationConstraint": "us-west-2"})
# No logging config:
result = s3.get_bucket_logging(Bucket=bucket_name)
assert not result.get("LoggingEnabled")
# A log-bucket that doesn't exist:
with assert_raises(ClientError) as err:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": "IAMNOTREAL",
"TargetPrefix": ""
}
})
assert err.exception.response["Error"]["Code"] == "InvalidTargetBucketForLogging"
# A log-bucket that's missing the proper ACLs for LogDelivery:
with assert_raises(ClientError) as err:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": log_bucket,
"TargetPrefix": ""
}
})
assert err.exception.response["Error"]["Code"] == "InvalidTargetBucketForLogging"
assert "log-delivery" in err.exception.response["Error"]["Message"]
# Add the proper "log-delivery" ACL to the log buckets:
bucket_owner = s3.get_bucket_acl(Bucket=log_bucket)["Owner"]
for bucket in [log_bucket, wrong_region_bucket]:
s3.put_bucket_acl(Bucket=bucket, AccessControlPolicy={
"Grants": [
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group"
},
"Permission": "WRITE"
},
{
"Grantee": {
"URI": "http://acs.amazonaws.com/groups/s3/LogDelivery",
"Type": "Group"
},
"Permission": "READ_ACP"
},
{
"Grantee": {
"Type": "CanonicalUser",
"ID": bucket_owner["ID"]
},
"Permission": "FULL_CONTROL"
}
],
"Owner": bucket_owner
})
# A log-bucket that's in the wrong region:
with assert_raises(ClientError) as err:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": wrong_region_bucket,
"TargetPrefix": ""
}
})
assert err.exception.response["Error"]["Code"] == "CrossLocationLoggingProhibitted"
# Correct logging:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": log_bucket,
"TargetPrefix": "{}/".format(bucket_name)
}
})
result = s3.get_bucket_logging(Bucket=bucket_name)
assert result["LoggingEnabled"]["TargetBucket"] == log_bucket
assert result["LoggingEnabled"]["TargetPrefix"] == "{}/".format(bucket_name)
assert not result["LoggingEnabled"].get("TargetGrants")
# And disabling:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={})
assert not s3.get_bucket_logging(Bucket=bucket_name).get("LoggingEnabled")
# And enabling with multiple target grants:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": log_bucket,
"TargetPrefix": "{}/".format(bucket_name),
"TargetGrants": [
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser"
},
"Permission": "READ"
},
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser"
},
"Permission": "WRITE"
}
]
}
})
result = s3.get_bucket_logging(Bucket=bucket_name)
assert len(result["LoggingEnabled"]["TargetGrants"]) == 2
assert result["LoggingEnabled"]["TargetGrants"][0]["Grantee"]["ID"] == \
"SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274"
# Test with just 1 grant:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": log_bucket,
"TargetPrefix": "{}/".format(bucket_name),
"TargetGrants": [
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser"
},
"Permission": "READ"
}
]
}
})
result = s3.get_bucket_logging(Bucket=bucket_name)
assert len(result["LoggingEnabled"]["TargetGrants"]) == 1
# With an invalid grant:
with assert_raises(ClientError) as err:
s3.put_bucket_logging(Bucket=bucket_name, BucketLoggingStatus={
"LoggingEnabled": {
"TargetBucket": log_bucket,
"TargetPrefix": "{}/".format(bucket_name),
"TargetGrants": [
{
"Grantee": {
"ID": "SOMEIDSTRINGHERE9238748923734823917498237489237409123840983274",
"Type": "CanonicalUser"
},
"Permission": "NOTAREALPERM"
}
]
}
})
assert err.exception.response["Error"]["Code"] == "MalformedXML"
@mock_s3 @mock_s3
def test_boto3_put_object_tagging(): def test_boto3_put_object_tagging():
s3 = boto3.client('s3', region_name='us-east-1') s3 = boto3.client('s3', region_name='us-east-1')
@ -1939,11 +2186,10 @@ def test_get_stream_gzipped():
Bucket='moto-tests', Bucket='moto-tests',
Key='keyname', Key='keyname',
) )
res = zlib.decompress(obj['Body'].read(), 16+zlib.MAX_WBITS) res = zlib.decompress(obj['Body'].read(), 16 + zlib.MAX_WBITS)
assert res == payload assert res == payload
TEST_XML = """\ TEST_XML = """\
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<ns0:WebsiteConfiguration xmlns:ns0="http://s3.amazonaws.com/doc/2006-03-01/"> <ns0:WebsiteConfiguration xmlns:ns0="http://s3.amazonaws.com/doc/2006-03-01/">