Merge pull request #140 from modestinc/improved_sqs_mocks
Enhanced SQS support
This commit is contained in:
commit
2274215b32
8
moto/sqs/exceptions.py
Normal file
8
moto/sqs/exceptions.py
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
class MessageNotInflight(Exception):
|
||||||
|
description = "The message referred to is not in flight."
|
||||||
|
status_code = 400
|
||||||
|
|
||||||
|
|
||||||
|
class ReceiptHandleIsInvalid(Exception):
|
||||||
|
description = "The receipt handle provided is not valid."
|
||||||
|
status_code = 400
|
@ -1,18 +1,30 @@
|
|||||||
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from moto.core import BaseBackend
|
from moto.core import BaseBackend
|
||||||
from moto.core.utils import camelcase_to_underscores, get_random_message_id
|
from moto.core.utils import camelcase_to_underscores, get_random_message_id
|
||||||
from .utils import generate_receipt_handle
|
from .utils import generate_receipt_handle, unix_time_millis
|
||||||
|
from .exceptions import (
|
||||||
|
ReceiptHandleIsInvalid,
|
||||||
|
MessageNotInflight
|
||||||
|
)
|
||||||
|
|
||||||
|
DEFAULT_ACCOUNT_ID = 123456789012
|
||||||
|
|
||||||
|
|
||||||
class Message(object):
|
class Message(object):
|
||||||
|
|
||||||
def __init__(self, message_id, body):
|
def __init__(self, message_id, body):
|
||||||
self.id = message_id
|
self.id = message_id
|
||||||
self.body = body
|
self.body = body
|
||||||
self.receipt_handle = generate_receipt_handle()
|
self.receipt_handle = None
|
||||||
|
self.sender_id = DEFAULT_ACCOUNT_ID
|
||||||
|
self.sent_timestamp = None
|
||||||
|
self.approximate_first_receive_timestamp = None
|
||||||
|
self.approximate_receive_count = 0
|
||||||
|
self.visible_at = 0
|
||||||
|
self.delayed_until = 0
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def md5(self):
|
def md5(self):
|
||||||
@ -20,6 +32,56 @@ class Message(object):
|
|||||||
body_md5.update(self.body)
|
body_md5.update(self.body)
|
||||||
return body_md5.hexdigest()
|
return body_md5.hexdigest()
|
||||||
|
|
||||||
|
def mark_sent(self, delay_seconds=None):
|
||||||
|
self.sent_timestamp = unix_time_millis()
|
||||||
|
if delay_seconds:
|
||||||
|
self.delay(delay_seconds=delay_seconds)
|
||||||
|
|
||||||
|
def mark_received(self, visibility_timeout=None):
|
||||||
|
"""
|
||||||
|
When a message is received we will set the first receive timestamp,
|
||||||
|
tap the ``approximate_receive_count`` and the ``visible_at`` time.
|
||||||
|
"""
|
||||||
|
if visibility_timeout:
|
||||||
|
visibility_timeout = int(visibility_timeout)
|
||||||
|
else:
|
||||||
|
visibility_timeout = 0
|
||||||
|
|
||||||
|
if not self.approximate_first_receive_timestamp:
|
||||||
|
self.approximate_first_receive_timestamp = unix_time_millis()
|
||||||
|
|
||||||
|
self.approximate_receive_count += 1
|
||||||
|
|
||||||
|
# Make message visible again in the future unless its
|
||||||
|
# destroyed.
|
||||||
|
if visibility_timeout:
|
||||||
|
self.change_visibility(visibility_timeout)
|
||||||
|
|
||||||
|
self.receipt_handle = generate_receipt_handle()
|
||||||
|
|
||||||
|
def change_visibility(self, visibility_timeout):
|
||||||
|
# We're dealing with milliseconds internally
|
||||||
|
visibility_timeout_msec = int(visibility_timeout) * 1000
|
||||||
|
self.visible_at = unix_time_millis() + visibility_timeout_msec
|
||||||
|
|
||||||
|
def delay(self, delay_seconds):
|
||||||
|
delay_msec = int(delay_seconds) * 1000
|
||||||
|
self.delayed_until = unix_time_millis() + delay_msec
|
||||||
|
|
||||||
|
@property
|
||||||
|
def visible(self):
|
||||||
|
current_time = unix_time_millis()
|
||||||
|
if current_time > self.visible_at:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def delayed(self):
|
||||||
|
current_time = unix_time_millis()
|
||||||
|
if current_time < self.delayed_until:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class Queue(object):
|
class Queue(object):
|
||||||
camelcase_attributes = ['ApproximateNumberOfMessages',
|
camelcase_attributes = ['ApproximateNumberOfMessages',
|
||||||
@ -37,12 +99,10 @@ class Queue(object):
|
|||||||
def __init__(self, name, visibility_timeout):
|
def __init__(self, name, visibility_timeout):
|
||||||
self.name = name
|
self.name = name
|
||||||
self.visibility_timeout = visibility_timeout or 30
|
self.visibility_timeout = visibility_timeout or 30
|
||||||
self.messages = []
|
self._messages = []
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
|
||||||
self.approximate_number_of_messages_delayed = 0
|
|
||||||
self.approximate_number_of_messages_not_visible = 0
|
|
||||||
self.created_timestamp = now
|
self.created_timestamp = now
|
||||||
self.delay_seconds = 0
|
self.delay_seconds = 0
|
||||||
self.last_modified_timestamp = now
|
self.last_modified_timestamp = now
|
||||||
@ -60,6 +120,18 @@ class Queue(object):
|
|||||||
visibility_timeout=properties.get('VisibilityTimeout'),
|
visibility_timeout=properties.get('VisibilityTimeout'),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def approximate_number_of_messages_delayed(self):
|
||||||
|
return len([m for m in self._messages if m.delayed])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def approximate_number_of_messages_not_visible(self):
|
||||||
|
return len([m for m in self._messages if not m.visible])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def approximate_number_of_messages(self):
|
||||||
|
return len(self.messages)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def physical_resource_id(self):
|
def physical_resource_id(self):
|
||||||
return self.name
|
return self.name
|
||||||
@ -72,12 +144,14 @@ class Queue(object):
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def approximate_number_of_messages(self):
|
def messages(self):
|
||||||
return len(self.messages)
|
return [message for message in self._messages if message.visible and not message.delayed]
|
||||||
|
|
||||||
|
def add_message(self, message):
|
||||||
|
self._messages.append(message)
|
||||||
|
|
||||||
|
|
||||||
class SQSBackend(BaseBackend):
|
class SQSBackend(BaseBackend):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.queues = {}
|
self.queues = {}
|
||||||
super(SQSBackend, self).__init__()
|
super(SQSBackend, self).__init__()
|
||||||
@ -112,28 +186,66 @@ class SQSBackend(BaseBackend):
|
|||||||
return queue
|
return queue
|
||||||
|
|
||||||
def send_message(self, queue_name, message_body, delay_seconds=None):
|
def send_message(self, queue_name, message_body, delay_seconds=None):
|
||||||
# TODO impemented delay_seconds
|
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
|
|
||||||
|
if delay_seconds:
|
||||||
|
delay_seconds = int(delay_seconds)
|
||||||
|
else:
|
||||||
|
delay_seconds = queue.delay_seconds
|
||||||
|
|
||||||
message_id = get_random_message_id()
|
message_id = get_random_message_id()
|
||||||
message = Message(message_id, message_body)
|
message = Message(message_id, message_body)
|
||||||
queue.messages.append(message)
|
|
||||||
|
message.mark_sent(
|
||||||
|
delay_seconds=delay_seconds
|
||||||
|
)
|
||||||
|
|
||||||
|
queue.add_message(message)
|
||||||
return message
|
return message
|
||||||
|
|
||||||
def receive_messages(self, queue_name, count):
|
def receive_messages(self, queue_name, count):
|
||||||
|
"""
|
||||||
|
Attempt to retrieve visible messages from a queue.
|
||||||
|
|
||||||
|
If a message was read by client and not deleted it is considered to be
|
||||||
|
"inflight" and cannot be read. We make attempts to obtain ``count``
|
||||||
|
messages but we may return less if messages are in-flight or there
|
||||||
|
are simple not enough messages in the queue.
|
||||||
|
|
||||||
|
:param string queue_name: The name of the queue to read from.
|
||||||
|
:param int count: The maximum amount of messages to retrieve.
|
||||||
|
"""
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
result = []
|
result = []
|
||||||
for _ in range(count):
|
# queue.messages only contains visible messages
|
||||||
if queue.messages:
|
for message in queue.messages:
|
||||||
result.append(queue.messages.pop(0))
|
message.mark_received(
|
||||||
|
visibility_timeout=queue.visibility_timeout
|
||||||
|
)
|
||||||
|
result.append(message)
|
||||||
|
if len(result) >= count:
|
||||||
|
break
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def delete_message(self, queue_name, receipt_handle):
|
def delete_message(self, queue_name, receipt_handle):
|
||||||
queue = self.get_queue(queue_name)
|
queue = self.get_queue(queue_name)
|
||||||
new_messages = [
|
new_messages = []
|
||||||
message for message in queue.messages
|
for message in queue._messages:
|
||||||
if message.receipt_handle != receipt_handle
|
# Only delete message if it is not visible and the reciept_handle
|
||||||
]
|
# matches.
|
||||||
queue.message = new_messages
|
if not message.visible and message.receipt_handle == receipt_handle:
|
||||||
|
continue
|
||||||
|
new_messages.append(message)
|
||||||
|
queue._messages = new_messages
|
||||||
|
|
||||||
|
def change_message_visibility(self, queue_name, receipt_handle, visibility_timeout):
|
||||||
|
queue = self.get_queue(queue_name)
|
||||||
|
for message in queue._messages:
|
||||||
|
if message.receipt_handle == receipt_handle:
|
||||||
|
if message.visible:
|
||||||
|
raise MessageNotInflight
|
||||||
|
message.change_visibility(visibility_timeout)
|
||||||
|
return
|
||||||
|
raise ReceiptHandleIsInvalid
|
||||||
|
|
||||||
sqs_backend = SQSBackend()
|
sqs_backend = SQSBackend()
|
||||||
|
@ -3,6 +3,12 @@ from jinja2 import Template
|
|||||||
from moto.core.responses import BaseResponse
|
from moto.core.responses import BaseResponse
|
||||||
from moto.core.utils import camelcase_to_underscores
|
from moto.core.utils import camelcase_to_underscores
|
||||||
from .models import sqs_backend
|
from .models import sqs_backend
|
||||||
|
from .exceptions import (
|
||||||
|
MessageNotInflight,
|
||||||
|
ReceiptHandleIsInvalid
|
||||||
|
)
|
||||||
|
|
||||||
|
MAXIMUM_VISIBILTY_TIMEOUT = 43200
|
||||||
|
|
||||||
|
|
||||||
class QueuesResponse(BaseResponse):
|
class QueuesResponse(BaseResponse):
|
||||||
@ -34,6 +40,28 @@ class QueuesResponse(BaseResponse):
|
|||||||
|
|
||||||
|
|
||||||
class QueueResponse(BaseResponse):
|
class QueueResponse(BaseResponse):
|
||||||
|
def change_message_visibility(self):
|
||||||
|
queue_name = self.path.split("/")[-1]
|
||||||
|
receipt_handle = self.querystring.get("ReceiptHandle")[0]
|
||||||
|
visibility_timeout = int(self.querystring.get("VisibilityTimeout")[0])
|
||||||
|
|
||||||
|
if visibility_timeout > MAXIMUM_VISIBILTY_TIMEOUT:
|
||||||
|
return "Invalid request, maximum visibility timeout is {0}".format(
|
||||||
|
MAXIMUM_VISIBILTY_TIMEOUT
|
||||||
|
), dict(status=400)
|
||||||
|
|
||||||
|
try:
|
||||||
|
sqs_backend.change_message_visibility(
|
||||||
|
queue_name=queue_name,
|
||||||
|
receipt_handle=receipt_handle,
|
||||||
|
visibility_timeout=visibility_timeout
|
||||||
|
)
|
||||||
|
except (ReceiptHandleIsInvalid, MessageNotInflight) as e:
|
||||||
|
return "Invalid request: {0}".format(e.description), dict(status=e.status_code)
|
||||||
|
|
||||||
|
template = Template(CHANGE_MESSAGE_VISIBILITY_RESPONSE)
|
||||||
|
return template.render()
|
||||||
|
|
||||||
def get_queue_attributes(self):
|
def get_queue_attributes(self):
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self.path.split("/")[-1]
|
||||||
queue = sqs_backend.get_queue(queue_name)
|
queue = sqs_backend.get_queue(queue_name)
|
||||||
@ -57,8 +85,19 @@ class QueueResponse(BaseResponse):
|
|||||||
|
|
||||||
def send_message(self):
|
def send_message(self):
|
||||||
message = self.querystring.get("MessageBody")[0]
|
message = self.querystring.get("MessageBody")[0]
|
||||||
|
delay_seconds = self.querystring.get('DelaySeconds')
|
||||||
|
|
||||||
|
if delay_seconds:
|
||||||
|
delay_seconds = int(delay_seconds[0])
|
||||||
|
else:
|
||||||
|
delay_seconds = 0
|
||||||
|
|
||||||
queue_name = self.path.split("/")[-1]
|
queue_name = self.path.split("/")[-1]
|
||||||
message = sqs_backend.send_message(queue_name, message)
|
message = sqs_backend.send_message(
|
||||||
|
queue_name,
|
||||||
|
message,
|
||||||
|
delay_seconds=delay_seconds
|
||||||
|
)
|
||||||
template = Template(SEND_MESSAGE_RESPONSE)
|
template = Template(SEND_MESSAGE_RESPONSE)
|
||||||
return template.render(message=message)
|
return template.render(message=message)
|
||||||
|
|
||||||
@ -138,7 +177,8 @@ class QueueResponse(BaseResponse):
|
|||||||
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
|
message_count = int(self.querystring.get("MaxNumberOfMessages")[0])
|
||||||
messages = sqs_backend.receive_messages(queue_name, message_count)
|
messages = sqs_backend.receive_messages(queue_name, message_count)
|
||||||
template = Template(RECEIVE_MESSAGE_RESPONSE)
|
template = Template(RECEIVE_MESSAGE_RESPONSE)
|
||||||
return template.render(messages=messages)
|
output = template.render(messages=messages)
|
||||||
|
return output
|
||||||
|
|
||||||
|
|
||||||
CREATE_QUEUE_RESPONSE = """<CreateQueueResponse>
|
CREATE_QUEUE_RESPONSE = """<CreateQueueResponse>
|
||||||
@ -226,18 +266,26 @@ RECEIVE_MESSAGE_RESPONSE = """<ReceiveMessageResponse>
|
|||||||
<ReceiveMessageResult>
|
<ReceiveMessageResult>
|
||||||
{% for message in messages %}
|
{% for message in messages %}
|
||||||
<Message>
|
<Message>
|
||||||
<MessageId>
|
<MessageId>{{ message.id }}</MessageId>
|
||||||
{{ message.id }}
|
<ReceiptHandle>{{ message.receipt_handle }}</ReceiptHandle>
|
||||||
</MessageId>
|
<MD5OfBody>{{ message.md5 }}</MD5OfBody>
|
||||||
<ReceiptHandle>
|
|
||||||
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
|
|
||||||
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
|
|
||||||
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
|
|
||||||
</ReceiptHandle>
|
|
||||||
<MD5OfBody>
|
|
||||||
{{ message.md5 }}
|
|
||||||
</MD5OfBody>
|
|
||||||
<Body>{{ message.body }}</Body>
|
<Body>{{ message.body }}</Body>
|
||||||
|
<Attribute>
|
||||||
|
<Name>SenderId</Name>
|
||||||
|
<Value>{{ message.sender_id }}</Value>
|
||||||
|
</Attribute>
|
||||||
|
<Attribute>
|
||||||
|
<Name>SentTimestamp</Name>
|
||||||
|
<Value>{{ message.sent_timestamp }}</Value>
|
||||||
|
</Attribute>
|
||||||
|
<Attribute>
|
||||||
|
<Name>ApproximateReceiveCount</Name>
|
||||||
|
<Value>{{ message.approximate_receive_count }}</Value>
|
||||||
|
</Attribute>
|
||||||
|
<Attribute>
|
||||||
|
<Name>ApproximateFirstReceiveTimestamp</Name>
|
||||||
|
<Value>{{ message.approximate_first_receive_timestamp }}</Value>
|
||||||
|
</Attribute>
|
||||||
</Message>
|
</Message>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</ReceiveMessageResult>
|
</ReceiveMessageResult>
|
||||||
@ -283,3 +331,11 @@ DELETE_MESSAGE_BATCH_RESPONSE = """<DeleteMessageBatchResponse>
|
|||||||
<RequestId>d6f86b7a-74d1-4439-b43f-196a1e29cd85</RequestId>
|
<RequestId>d6f86b7a-74d1-4439-b43f-196a1e29cd85</RequestId>
|
||||||
</ResponseMetadata>
|
</ResponseMetadata>
|
||||||
</DeleteMessageBatchResponse>"""
|
</DeleteMessageBatchResponse>"""
|
||||||
|
|
||||||
|
CHANGE_MESSAGE_VISIBILITY_RESPONSE = """<ChangeMessageVisibilityResponse>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>
|
||||||
|
6a7a282a-d013-4a59-aba9-335b0fa48bed
|
||||||
|
</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</ChangeMessageVisibilityResponse>"""
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import datetime
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
|
|
||||||
@ -6,3 +7,14 @@ def generate_receipt_handle():
|
|||||||
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles
|
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles
|
||||||
length = 185
|
length = 185
|
||||||
return ''.join(random.choice(string.lowercase) for x in range(length))
|
return ''.join(random.choice(string.lowercase) for x in range(length))
|
||||||
|
|
||||||
|
|
||||||
|
def unix_time(dt=None):
|
||||||
|
dt = dt or datetime.datetime.utcnow()
|
||||||
|
epoch = datetime.datetime.utcfromtimestamp(0)
|
||||||
|
delta = dt - epoch
|
||||||
|
return (delta.days * 86400) + (delta.seconds + (delta.microseconds / 1e6))
|
||||||
|
|
||||||
|
|
||||||
|
def unix_time_millis(dt=None):
|
||||||
|
return unix_time(dt) * 1000.0
|
||||||
|
@ -26,5 +26,6 @@ def test_sqs_list_identities():
|
|||||||
|
|
||||||
res = test_client.get(
|
res = test_client.get(
|
||||||
'/123/testqueue?Action=ReceiveMessage&MaxNumberOfMessages=1')
|
'/123/testqueue?Action=ReceiveMessage&MaxNumberOfMessages=1')
|
||||||
|
|
||||||
message = re.search("<Body>(.*?)</Body>", res.data).groups()[0]
|
message = re.search("<Body>(.*?)</Body>", res.data).groups()[0]
|
||||||
message.should.equal('test-message')
|
message.should.equal('test-message')
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
import boto
|
import boto
|
||||||
from boto.exception import SQSError
|
from boto.exception import SQSError
|
||||||
from boto.sqs.message import RawMessage
|
from boto.sqs.message import RawMessage
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
import sure # noqa
|
import sure # noqa
|
||||||
|
import time
|
||||||
|
|
||||||
from moto import mock_sqs
|
from moto import mock_sqs
|
||||||
|
|
||||||
@ -75,30 +77,140 @@ def test_set_queue_attribute():
|
|||||||
def test_send_message():
|
def test_send_message():
|
||||||
conn = boto.connect_sqs('the_key', 'the_secret')
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
queue = conn.create_queue("test-queue", visibility_timeout=60)
|
queue = conn.create_queue("test-queue", visibility_timeout=60)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
conn.send_message(queue, 'this is a test message')
|
body_one = 'this is a test message'
|
||||||
conn.send_message(queue, 'this is another test message')
|
body_two = 'this is another test message'
|
||||||
|
|
||||||
|
queue.write(queue.new_message(body_one))
|
||||||
|
queue.write(queue.new_message(body_two))
|
||||||
|
|
||||||
|
messages = conn.receive_message(queue, number_messages=2)
|
||||||
|
|
||||||
|
messages[0].get_body().should.equal(body_one)
|
||||||
|
messages[1].get_body().should.equal(body_two)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_send_message_with_delay():
|
||||||
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
|
queue = conn.create_queue("test-queue", visibility_timeout=60)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
|
body_one = 'this is a test message'
|
||||||
|
body_two = 'this is another test message'
|
||||||
|
|
||||||
|
queue.write(queue.new_message(body_one), delay_seconds=60)
|
||||||
|
queue.write(queue.new_message(body_two))
|
||||||
|
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
|
messages = conn.receive_message(queue, number_messages=2)
|
||||||
|
assert len(messages) == 1
|
||||||
|
message = messages[0]
|
||||||
|
assert message.get_body().should.equal(body_two)
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_message_becomes_inflight_when_received():
|
||||||
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
|
queue = conn.create_queue("test-queue", visibility_timeout=2)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
|
body_one = 'this is a test message'
|
||||||
|
queue.write(queue.new_message(body_one))
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
messages = conn.receive_message(queue, number_messages=1)
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
messages[0].get_body().should.equal('this is a test message')
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
# Wait
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_change_message_visibility():
|
||||||
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
|
queue = conn.create_queue("test-queue", visibility_timeout=2)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
|
body_one = 'this is another test message'
|
||||||
|
queue.write(queue.new_message(body_one))
|
||||||
|
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
messages[0].change_visibility(2)
|
||||||
|
|
||||||
|
# Wait
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Message is not visible
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Message now becomes visible
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
messages[0].delete()
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_message_attributes():
|
||||||
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
|
queue = conn.create_queue("test-queue", visibility_timeout=2)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
|
body_one = 'this is another test message'
|
||||||
|
queue.write(queue.new_message(body_one))
|
||||||
|
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
message_attributes = messages[0].attributes
|
||||||
|
|
||||||
|
assert message_attributes.get('ApproximateFirstReceiveTimestamp')
|
||||||
|
assert int(message_attributes.get('ApproximateReceiveCount')) == 1
|
||||||
|
assert message_attributes.get('SentTimestamp')
|
||||||
|
assert message_attributes.get('SenderId')
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_read_message_from_queue():
|
def test_read_message_from_queue():
|
||||||
conn = boto.connect_sqs()
|
conn = boto.connect_sqs()
|
||||||
queue = conn.create_queue('testqueue')
|
queue = conn.create_queue('testqueue')
|
||||||
queue.write(queue.new_message('foo bar baz'))
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
|
body = 'foo bar baz'
|
||||||
|
queue.write(queue.new_message(body))
|
||||||
message = queue.read(1)
|
message = queue.read(1)
|
||||||
message.get_body().should.equal('foo bar baz')
|
message.get_body().should.equal(body)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_queue_length():
|
def test_queue_length():
|
||||||
conn = boto.connect_sqs('the_key', 'the_secret')
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
queue = conn.create_queue("test-queue", visibility_timeout=60)
|
queue = conn.create_queue("test-queue", visibility_timeout=60)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
conn.send_message(queue, 'this is a test message')
|
queue.write(queue.new_message('this is a test message'))
|
||||||
conn.send_message(queue, 'this is another test message')
|
queue.write(queue.new_message('this is another test message'))
|
||||||
queue.count().should.equal(2)
|
queue.count().should.equal(2)
|
||||||
|
|
||||||
|
|
||||||
@ -106,15 +218,22 @@ def test_queue_length():
|
|||||||
def test_delete_message():
|
def test_delete_message():
|
||||||
conn = boto.connect_sqs('the_key', 'the_secret')
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
queue = conn.create_queue("test-queue", visibility_timeout=60)
|
queue = conn.create_queue("test-queue", visibility_timeout=60)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
conn.send_message(queue, 'this is a test message')
|
queue.write(queue.new_message('this is a test message'))
|
||||||
conn.send_message(queue, 'this is another test message')
|
queue.write(queue.new_message('this is another test message'))
|
||||||
|
queue.count().should.equal(2)
|
||||||
|
|
||||||
messages = conn.receive_message(queue, number_messages=1)
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
assert len(messages) == 1
|
||||||
messages[0].delete()
|
messages[0].delete()
|
||||||
|
|
||||||
queue.count().should.equal(1)
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
assert len(messages) == 1
|
||||||
|
messages[0].delete()
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
|
||||||
@mock_sqs
|
@mock_sqs
|
||||||
def test_send_batch_operation():
|
def test_send_batch_operation():
|
||||||
@ -187,3 +306,53 @@ def test_queue_attributes():
|
|||||||
attribute_names.should.contain('VisibilityTimeout')
|
attribute_names.should.contain('VisibilityTimeout')
|
||||||
attribute_names.should.contain('LastModifiedTimestamp')
|
attribute_names.should.contain('LastModifiedTimestamp')
|
||||||
attribute_names.should.contain('QueueArn')
|
attribute_names.should.contain('QueueArn')
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_change_message_visibility_on_invalid_receipt():
|
||||||
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
|
queue = conn.create_queue("test-queue", visibility_timeout=1)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
|
queue.write(queue.new_message('this is another test message'))
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
original_message = messages[0]
|
||||||
|
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
original_message.change_visibility.when.called_with(100).should.throw(SQSError)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_sqs
|
||||||
|
def test_change_message_visibility_on_visible_message():
|
||||||
|
conn = boto.connect_sqs('the_key', 'the_secret')
|
||||||
|
queue = conn.create_queue("test-queue", visibility_timeout=1)
|
||||||
|
queue.set_message_class(RawMessage)
|
||||||
|
|
||||||
|
queue.write(queue.new_message('this is another test message'))
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
messages = conn.receive_message(queue, number_messages=1)
|
||||||
|
|
||||||
|
assert len(messages) == 1
|
||||||
|
|
||||||
|
original_message = messages[0]
|
||||||
|
|
||||||
|
queue.count().should.equal(0)
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
queue.count().should.equal(1)
|
||||||
|
|
||||||
|
original_message.change_visibility.when.called_with(100).should.throw(SQSError)
|
||||||
|
Loading…
Reference in New Issue
Block a user