Adding server mode

This commit is contained in:
Steve Pulec 2013-03-05 08:14:43 -05:00
parent c6f5afff75
commit a728b2581a
31 changed files with 489 additions and 66 deletions

4
.coveragerc Normal file
View File

@ -0,0 +1,4 @@
[report]
exclude_lines =
if __name__ == .__main__.:

View File

@ -61,7 +61,32 @@ It gets even better! Moto isn't just S3. Here's the status of the other AWS serv
* SES (@mock_ses) - core done
* SQS (@mock_sqs) - core done
This library has been tested on boto v2.5+.
For example, imagine you have a function that you use to launch new ec2 instances:
```python
import boto
def add_servers(ami_id, count):
conn = boto.connect_ec2('the_key', 'the_secret')
for index in range(count):
conn.run_instances(ami_id)
```
To test it:
```python
from . import add_servers
@mock_ec2
def test_add_servers():
add_servers('ami-1234abcd', 2)
conn = boto.connect_ec2('the_key', 'the_secret')
reservations = conn.get_all_instances()
assert len(reservations) == 2
instance1 = reservations[0].instances[0]
assert instance1.image_id == 'ami-1234abcd'
```
## Usage
@ -108,8 +133,28 @@ def test_my_model_save():
mock.stop()
```
## Stand-alone Server Mode
Moto also comes with a stand-alone server mode. This allows you to utilize the backend structure of Moto even if you don't use Python.
To run a service:
```console
$ moto_server ec2
* Running on http://127.0.0.1:5000/
```
Then go to [localhost](http://localhost:5000/?Action=DescribeInstances) to see a list of running instances (it will be empty since you haven't added any yet).
## Install
```console
$ pip install moto
```
This library has been tested on boto v2.5+.
## Thanks
A huge thanks to [Gabriel Falcão](https://github.com/gabrielfalcao) and his [HTTPretty](https://github.com/gabrielfalcao/HTTPretty) library. Moto would not exist without it.

View File

@ -2,6 +2,7 @@ import functools
import re
from moto.packages.httpretty import HTTPretty
from .utils import convert_regex_to_flask_path
class MockAWS(object):
@ -48,13 +49,56 @@ class BaseBackend(object):
self.__init__()
@property
def urls(self):
def _url_module(self):
backend_module = self.__class__.__module__
backend_urls_module_name = backend_module.replace("models", "urls")
backend_urls_module = __import__(backend_urls_module_name, fromlist=['urls'])
urls = backend_urls_module.urls
backend_urls_module = __import__(backend_urls_module_name, fromlist=['url_bases', 'url_paths'])
return backend_urls_module
@property
def urls(self):
"""
A dictionary of the urls to be mocked with this service and the handlers
that should be called in their place
"""
url_bases = self._url_module.url_bases
unformatted_paths = self._url_module.url_paths
urls = {}
for url_base in url_bases:
for url_path, handler in unformatted_paths.iteritems():
url = url_path.format(url_base)
urls[url] = handler
return urls
@property
def url_paths(self):
"""
A dictionary of the paths of the urls to be mocked with this service and
the handlers that should be called in their place
"""
unformatted_paths = self._url_module.url_paths
paths = {}
for unformatted_path, handler in unformatted_paths.iteritems():
path = unformatted_path.format("")
paths[path] = handler
return paths
@property
def flask_paths(self):
"""
The url paths that will be used for the flask server
"""
paths = {}
for url_path, handler in self.url_paths.iteritems():
url_path = convert_regex_to_flask_path(url_path)
paths[url_path] = handler
return paths
def decorator(self, func=None):
if func:
return MockAWS(self)(func)

View File

@ -4,6 +4,9 @@ from moto.core.utils import headers_to_dict, camelcase_to_underscores, method_na
class BaseResponse(object):
def dispatch2(self, uri, body, headers):
return self.dispatch(uri, body, headers)
def dispatch(self, uri, body, headers):
if body:
querystring = parse_qs(body)
@ -13,7 +16,7 @@ class BaseResponse(object):
self.path = uri.path
self.querystring = querystring
action = querystring['Action'][0]
action = querystring.get('Action', [""])[0]
action = camelcase_to_underscores(action)
method_names = method_names_from_class(self.__class__)

View File

@ -1,9 +1,17 @@
from collections import namedtuple
import inspect
import random
import re
from urlparse import parse_qs
from flask import request
def headers_to_dict(headers):
if isinstance(headers, dict):
# If already dict, return
return headers
result = {}
for index, header in enumerate(headers.split("\r\n")):
if not header:
@ -51,3 +59,55 @@ def get_random_hex(length=8):
def get_random_message_id():
return '{}-{}-{}-{}-{}'.format(get_random_hex(8), get_random_hex(4), get_random_hex(4), get_random_hex(4), get_random_hex(12))
def convert_regex_to_flask_path(url_path):
"""
Converts a regex matching url to one that can be used with flask
"""
for token in ["$"]:
url_path = url_path.replace(token, "")
def caller(reg):
match_name, match_pattern = reg.groups()
return '<regex("{0}"):{1}>'.format(match_pattern, match_name)
url_path = re.sub("\(\?P<(.*?)>(.*?)\)", caller, url_path)
return url_path
class convert_flask_to_httpretty_response(object):
def __init__(self, callback):
self.callback = callback
@property
def __name__(self):
# For instance methods, use class and method names. Otherwise
# use module and method name
if inspect.ismethod(self.callback):
outer = self.callback.im_class.__name__
else:
outer = self.callback.__module__
return "{}.{}".format(outer, self.callback.__name__)
def __call__(self, args=None, **kwargs):
hostname = request.host_url
method = request.method
path = request.path
query = request.query_string
# Mimic the HTTPretty URIInfo class
URI = namedtuple('URI', 'hostname method path query')
uri = URI(hostname, method, path, query)
body = request.data or query
headers = dict(request.headers)
result = self.callback(uri, body, headers)
if isinstance(result, basestring):
# result is just the response
return result
else:
# result is a responce, headers tuple
response, headers = result
status = headers.pop('status', None)
return response, status, headers

View File

@ -1,6 +1,6 @@
import re
import json
from moto.core.utils import headers_to_dict
from .models import dynamodb_backend
@ -17,12 +17,16 @@ class DynamoHandler(object):
ie: X-Amz-Target: DynamoDB_20111205.ListTables -> ListTables
"""
match = re.search(r'X-Amz-Target: \w+\.(\w+)', headers)
return match.groups()[0]
match = headers.get('X-Amz-Target')
if match:
return match.split(".")[1]
def dispatch(self):
method = self.get_method_name(self.headers)
return getattr(self, method)(self.uri, self.body, self.headers)
if method:
return getattr(self, method)(self.uri, self.body, self.headers)
else:
return "", dict(status=404)
def ListTables(self, uri, body, headers):
tables = dynamodb_backend.tables.keys()
@ -36,4 +40,4 @@ class DynamoHandler(object):
def handler(uri, body, headers):
return DynamoHandler(uri, body, headers).dispatch()
return DynamoHandler(uri, body, headers_to_dict(headers)).dispatch()

View File

@ -4,9 +4,13 @@ from .responses import handler
def sts_handler(uri, body, headers):
return GET_SESSION_TOKEN_RESULT
urls = {
"https?://dynamodb.us-east-1.amazonaws.com/": handler,
"https?://sts.amazonaws.com/": sts_handler,
url_bases = [
"https?://dynamodb.us-east-1.amazonaws.com",
"https?://sts.amazonaws.com",
]
url_paths = {
"{0}/": handler,
}

View File

@ -31,12 +31,13 @@ class InstanceBackend(object):
if instance.id == instance_id:
return instance
def add_instances(self, count):
def add_instances(self, image_id, count):
new_reservation = Reservation()
new_reservation.id = random_reservation_id()
for index in range(count):
new_instance = Instance()
new_instance.id = random_instance_id()
new_instance.image_id = image_id
new_instance._state_name = "pending"
new_instance._state_code = 0
new_reservation.instances.append(new_instance)
@ -226,11 +227,11 @@ class SecurityRule(object):
@property
def unique_representation(self):
return "{}-{}-{}-{}-{}".format(
self.ip_protocol,
self.from_port,
self.to_port,
self.ip_ranges,
self.source_groups
self.ip_protocol,
self.from_port,
self.to_port,
self.ip_ranges,
self.source_groups
)
def __eq__(self, other):

View File

@ -71,8 +71,9 @@ class EC2Response(object):
else:
querystring = parse_qs(headers)
action = querystring['Action'][0]
action = camelcase_to_underscores(action)
action = querystring.get('Action', [None])[0]
if action:
action = camelcase_to_underscores(action)
for sub_response in self.sub_responses:
method_names = method_names_from_class(sub_response)

View File

@ -16,7 +16,8 @@ class InstanceResponse(object):
def run_instances(self):
min_count = int(self.querystring.get('MinCount', ['1'])[0])
new_reservation = ec2_backend.add_instances(min_count)
image_id = self.querystring.get('ImageId')[0]
new_reservation = ec2_backend.add_instances(image_id, min_count)
template = Template(EC2_RUN_INSTANCES)
return template.render(reservation=new_reservation)
@ -75,7 +76,7 @@ EC2_RUN_INSTANCES = """<RunInstancesResponse xmlns="http://ec2.amazonaws.com/doc
{% for instance in reservation.instances %}
<item>
<instanceId>{{ instance.id }}</instanceId>
<imageId>ami-60a54009</imageId>
<imageId>{{ instance.image_id }}</imageId>
<instanceState>
<code>{{ instance._state_code }}</code>
<name>{{ instance._state_name }}</name>
@ -127,7 +128,7 @@ EC2_DESCRIBE_INSTANCES = """<DescribeInstancesResponse xmlns='http://ec2.amazona
{% for instance in reservation.instances %}
<item>
<instanceId>{{ instance.id }}</instanceId>
<imageId>ami-1a2b3c4d</imageId>
<imageId>{{ instance.image_id }}</imageId>
<instanceState>
<code>{{ instance._state_code }}</code>
<name>{{ instance._state_name }}</name>

View File

@ -1,5 +1,10 @@
from .responses import EC2Response
urls = {
"https?://ec2.us-east-1.amazonaws.com/": EC2Response().dispatch,
url_bases = [
"https?://ec2.us-east-1.amazonaws.com",
]
url_paths = {
'{0}/': EC2Response().dispatch,
}

View File

@ -61,8 +61,9 @@ class S3Backend(BaseBackend):
return new_key
def get_key(self, bucket_name, key_name):
bucket = self.buckets[bucket_name]
return bucket.keys.get(key_name)
bucket = self.get_bucket(bucket_name)
if bucket:
return bucket.keys.get(key_name)
def prefix_query(self, bucket, prefix):
key_results = set()

View File

@ -7,7 +7,7 @@ from moto.core.utils import headers_to_dict
from .utils import bucket_name_from_hostname
def all_buckets(uri, body, method):
def all_buckets():
# No bucket specified. Listing all buckets
all_buckets = s3_backend.get_all_buckets()
template = Template(S3_ALL_BUCKETS)
@ -20,6 +20,9 @@ def bucket_response(uri, body, headers):
querystring = parse_qs(uri.query)
bucket_name = bucket_name_from_hostname(hostname)
if not bucket_name:
# If no bucket specified, list all buckets
return all_buckets()
if method == 'GET':
bucket = s3_backend.get_bucket(bucket_name)
@ -27,8 +30,12 @@ def bucket_response(uri, body, headers):
prefix = querystring.get('prefix', [None])[0]
result_keys, result_folders = s3_backend.prefix_query(bucket, prefix)
template = Template(S3_BUCKET_GET_RESPONSE)
return template.render(bucket=bucket, prefix=prefix,
result_keys=result_keys, result_folders=result_folders)
return template.render(
bucket=bucket,
prefix=prefix,
result_keys=result_keys,
result_folders=result_folders
)
else:
return "", dict(status=404)
elif method == 'PUT':

View File

@ -1,9 +1,10 @@
from .responses import all_buckets, bucket_response, key_response
from .responses import bucket_response, key_response
base_url = "https?://(.*).s3.amazonaws.com"
url_bases = [
"https?://(?P<bucket_name>\w*)\.?s3.amazonaws.com"
]
urls = {
'https?://s3.amazonaws.com/$': all_buckets,
'{0}/$'.format(base_url): bucket_response,
'{}/(.+)'.format(base_url): key_response,
url_paths = {
'{0}/$': bucket_response,
'{0}/(?P<key_name>\w+)': key_response,
}

View File

@ -1,8 +1,23 @@
import re
import urlparse
bucket_name_regex = re.compile("(.+).s3.amazonaws.com")
def bucket_name_from_hostname(hostname):
bucket_result = bucket_name_regex.search(hostname)
return bucket_result.groups()[0]
if 'amazonaws.com' in hostname:
bucket_result = bucket_name_regex.search(hostname)
if bucket_result:
return bucket_result.groups()[0]
else:
# In server mode. Use left-most part of subdomain for bucket name
split_url = urlparse.urlparse(hostname)
# If 'www' prefixed, strip it.
clean_hostname = split_url.netloc.lstrip("www.")
if '.' in clean_hostname:
return clean_hostname.split(".")[0]
else:
# No subdomain found.
return None

45
moto/server.py Normal file
View File

@ -0,0 +1,45 @@
import sys
from flask import Flask
from werkzeug.routing import BaseConverter
from moto.dynamodb import dynamodb_backend # flake8: noqa
from moto.ec2 import ec2_backend # flake8: noqa
from moto.s3 import s3_backend # flake8: noqa
from moto.ses import ses_backend # flake8: noqa
from moto.sqs import sqs_backend # flake8: noqa
from moto.core.utils import convert_flask_to_httpretty_response
app = Flask(__name__)
HTTP_METHODS = ["GET", "POST", "PUT", "DELETE", "HEAD"]
class RegexConverter(BaseConverter):
# http://werkzeug.pocoo.org/docs/routing/#custom-converters
def __init__(self, url_map, *items):
super(RegexConverter, self).__init__(url_map)
self.regex = items[0]
def configure_urls(service):
backend = globals()["{}_backend".format(service)]
from werkzeug.routing import Map
app.url_map = Map()
app.url_map.converters['regex'] = RegexConverter
for url_path, handler in backend.flask_paths.iteritems():
app.route(url_path, methods=HTTP_METHODS)(convert_flask_to_httpretty_response(handler))
def main(args=sys.argv):
if len(args) != 2:
print("Usage: moto_server <service>")
sys.exit(1)
service_name = args[1]
configure_urls(service_name)
app.testing = True
app.run()
if __name__ == '__main__':
main()

View File

@ -1,7 +1,9 @@
from .responses import EmailResponse
base_url = "https?://email.us-east-1.amazonaws.com"
url_bases = [
"https?://email.us-east-1.amazonaws.com"
]
urls = {
'{0}/$'.format(base_url): EmailResponse().dispatch,
url_paths = {
'{0}/$': EmailResponse().dispatch,
}

View File

@ -1,8 +1,10 @@
from .responses import QueueResponse, QueuesResponse
base_url = "https?://(.*).amazonaws.com"
url_bases = [
"https?://(.*).amazonaws.com"
]
urls = {
'{0}/$'.format(base_url): QueuesResponse().dispatch,
'{0}/(\d+)/(.*)$'.format(base_url): QueueResponse().dispatch,
url_paths = {
'{0}/$': QueuesResponse().dispatch2,
'{0}/(?P<account_id>\d+)/(?P<queue_name>\w+)': QueueResponse().dispatch,
}

View File

@ -1,8 +1,6 @@
boto
coverage
freezegun
#httpretty
Jinja2
mock
nose
requests

View File

@ -6,13 +6,19 @@ setup(
name='moto',
version='0.0.6',
description='Moto is a library that allows your python tests to easily mock'
' out the boto library',
' out the boto library',
author='Steve Pulec',
author_email='spulec@gmail',
url='https://github.com/spulec/moto',
entry_points={
'console_scripts': [
'moto_server = moto.server:main',
],
},
packages=find_packages(),
install_requires=[
"boto",
"Jinja2",
"flask",
],
)

View File

@ -0,0 +1,19 @@
from mock import patch
import sure # flake8: noqa
from moto.server import main
def test_wrong_arguments():
try:
main(["name", "test1", "test2"])
assert False, ("main() when called with the incorrect number of args"
" should raise a system exit")
except SystemExit:
pass
@patch('moto.server.app.run')
def test_right_arguments(app_run):
main(["name", "s3"])
app_run.assert_called_once_with()

View File

@ -0,0 +1,20 @@
import sure # flake8: noqa
from moto.core.utils import convert_regex_to_flask_path
def test_flask_path_converting_simple():
convert_regex_to_flask_path("/").should.equal("/")
convert_regex_to_flask_path("/$").should.equal("/")
convert_regex_to_flask_path("/foo").should.equal("/foo")
convert_regex_to_flask_path("/foo/bar/").should.equal("/foo/bar/")
def test_flask_path_converting_regex():
convert_regex_to_flask_path("/(?P<key_name>\w+)").should.equal('/<regex("\w+"):key_name>')
convert_regex_to_flask_path("(?P<account_id>\d+)/(?P<queue_name>.*)$").should.equal(
'<regex("\d+"):account_id>/<regex(".*"):queue_name>'
)

View File

@ -0,0 +1,18 @@
import sure # flake8: noqa
import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("dynamodb")
def test_table_list():
test_client = server.app.test_client()
res = test_client.get('/')
res.status_code.should.equal(404)
headers = {'X-Amz-Target': 'TestTable.ListTables'}
res = test_client.get('/', headers=headers)
res.data.should.contain('TableNames')

View File

@ -9,7 +9,7 @@ from moto import mock_ec2
@mock_ec2
def test_ami_create_and_delete():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
image = conn.create_image(instance.id, "test-ami", "this is a test ami")
@ -23,13 +23,14 @@ def test_ami_create_and_delete():
@mock_ec2
def test_ami_create_from_missing_instance():
conn = boto.connect_ec2('the_key', 'the_secret')
conn.create_image.when.called_with("i-abcdefg", "test-ami", "this is a test ami").should.throw(EC2ResponseError)
args = ["i-abcdefg", "test-ami", "this is a test ami"]
conn.create_image.when.called_with(*args).should.throw(EC2ResponseError)
@mock_ec2
def test_ami_pulls_attributes_from_instance():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.modify_attribute("kernel", "test-kernel")

View File

@ -1,7 +1,6 @@
import boto
from boto.exception import EC2ResponseError
from sure import expect
import sure # flake8: noqa
from moto import mock_ec2
@ -28,7 +27,7 @@ def test_create_and_delete_volume():
@mock_ec2
def test_volume_attach_and_detach():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
volume = conn.create_volume(80, "us-east-1a")
@ -47,7 +46,8 @@ def test_volume_attach_and_detach():
volume.update()
volume.volume_state().should.equal('available')
conn.detach_volume.when.called_with(volume.id, instance.id, "/dev/sdh").should.throw(EC2ResponseError)
conn.detach_volume.when.called_with(
volume.id, instance.id, "/dev/sdh").should.throw(EC2ResponseError)
@mock_ec2

View File

@ -1,14 +1,34 @@
import boto
from boto.ec2.instance import Reservation, InstanceAttribute
from sure import expect
import sure # flake8: noqa
from moto import mock_ec2
################ Test Readme ###############
def add_servers(ami_id, count):
conn = boto.connect_ec2('the_key', 'the_secret')
for index in range(count):
conn.run_instances(ami_id)
@mock_ec2
def test_add_servers():
add_servers('ami-1234abcd', 2)
conn = boto.connect_ec2('the_key', 'the_secret')
reservations = conn.get_all_instances()
assert len(reservations) == 2
instance1 = reservations[0].instances[0]
assert instance1.image_id == 'ami-1234abcd'
############################################
@mock_ec2
def test_instance_launch_and_terminate():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
reservation.should.be.a(Reservation)
reservation.instances.should.have.length_of(1)
instance = reservation.instances[0]
@ -31,11 +51,12 @@ def test_instance_launch_and_terminate():
@mock_ec2
def test_instance_start_and_stop():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>', min_count=2)
reservation = conn.run_instances('ami-1234abcd', min_count=2)
instances = reservation.instances
instances.should.have.length_of(2)
stopped_instances = conn.stop_instances([instance.id for instance in instances])
instance_ids = [instance.id for instance in instances]
stopped_instances = conn.stop_instances(instance_ids)
for instance in stopped_instances:
instance.state.should.equal('stopping')
@ -47,7 +68,7 @@ def test_instance_start_and_stop():
@mock_ec2
def test_instance_reboot():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.reboot()
instance.state.should.equal('pending')
@ -56,7 +77,7 @@ def test_instance_reboot():
@mock_ec2
def test_instance_attribute_instance_type():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.modify_attribute("instanceType", "m1.small")
@ -69,11 +90,11 @@ def test_instance_attribute_instance_type():
@mock_ec2
def test_instance_attribute_user_data():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.modify_attribute("userData", "this is my user data")
instance_attribute = instance.get_attribute("userData")
instance_attribute.should.be.a(InstanceAttribute)
expect(instance_attribute.get("userData")).should.equal("this is my user data")
instance_attribute.get("userData").should.equal("this is my user data")

View File

@ -0,0 +1,20 @@
import re
import sure # flake8: noqa
import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("ec2")
def test_ec2_server_get():
test_client = server.app.test_client()
res = test_client.get('/?Action=RunInstances&ImageId=ami-60a54009')
groups = re.search("<instanceId>(.*)</instanceId>", res.data)
instance_id = groups.groups()[0]
res = test_client.get('/?Action=DescribeInstances')
res.data.should.contain(instance_id)

View File

@ -1,5 +1,5 @@
import boto
from sure import expect
import sure # flake8: noqa
from moto import mock_ec2
@ -7,7 +7,7 @@ from moto import mock_ec2
@mock_ec2
def test_instance_launch_and_terminate():
conn = boto.connect_ec2('the_key', 'the_secret')
reservation = conn.run_instances('<ami-image-id>')
reservation = conn.run_instances('ami-1234abcd')
instance = reservation.instances[0]
instance.add_tag("a key", "some value")

View File

@ -0,0 +1,35 @@
import sure # flake8: noqa
import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("s3")
def test_s3_server_get():
test_client = server.app.test_client()
res = test_client.get('/')
res.data.should.contain('ListAllMyBucketsResult')
def test_s3_server_bucket_create():
test_client = server.app.test_client()
res = test_client.put('/', 'http://foobar.localhost:5000/')
res.status_code.should.equal(200)
res = test_client.get('/')
res.data.should.contain('<Name>foobar</Name>')
res = test_client.get('/', 'http://foobar.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.contain("ListBucketResult")
res = test_client.put('/bar', 'http://foobar.localhost:5000/', data='test value')
res.status_code.should.equal(200)
res = test_client.get('/bar', 'http://foobar.localhost:5000/')
res.status_code.should.equal(200)
res.data.should.equal("test value")

View File

@ -0,0 +1,14 @@
import sure # flake8: noqa
import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("ses")
def test_ses_list_identities():
test_client = server.app.test_client()
res = test_client.get('/?Action=ListIdentities')
res.data.should.contain("ListIdentitiesResponse")

View File

@ -0,0 +1,26 @@
import base64
import re
import sure # flake8: noqa
import moto.server as server
'''
Test the different server responses
'''
server.configure_urls("sqs")
def test_ses_list_identities():
test_client = server.app.test_client()
res = test_client.get('/?Action=ListQueues')
res.data.should.contain("ListQueuesResponse")
res = test_client.put('/?Action=CreateQueue&QueueName=testqueue')
res = test_client.put(
'/123/testqueue?MessageBody=test-message&Action=SendMessage')
res = test_client.get(
'/123/testqueue?Action=ReceiveMessage&MaxNumberOfMessages=1')
message = re.search("<Body>(.*?)</Body>", res.data).groups()[0]
base64.decodestring(message).should.equal('test-message')