From d560ff002d6a188ceba295a012ef47415896c753 Mon Sep 17 00:00:00 2001 From: David Laban Date: Sat, 5 Nov 2022 13:09:58 +0000 Subject: [PATCH] Remove microsleep (#1903) Use `threading.Condition` instead of `sleep()` to prevent high cpu usage while SQS long polling on an empty queue. Ref: #871 Ref: #1916 Co-authored-by: Brian Pandola --- moto/sqs/models.py | 16 +++++++++++----- tests/test_sqs/test_server.py | 17 +++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/moto/sqs/models.py b/moto/sqs/models.py index b213239cb..9362b20a2 100644 --- a/moto/sqs/models.py +++ b/moto/sqs/models.py @@ -7,6 +7,7 @@ import string import struct from copy import deepcopy from typing import Dict +from threading import Condition from xml.sax.saxutils import escape from moto.core.exceptions import RESTError @@ -257,6 +258,7 @@ class Queue(CloudFormationModel): self._messages = [] self._pending_messages = set() self.deleted_messages = set() + self._messages_lock = Condition() now = unix_time() self.created_timestamp = now @@ -536,7 +538,9 @@ class Queue(CloudFormationModel): if diff / 1000 < DEDUPLICATION_TIME_IN_SECONDS: return - self._messages.append(message) + with self._messages_lock: + self._messages.append(message) + self._messages_lock.notify_all() for arn, esm in self.lambda_event_source_mappings.items(): backend = sqs_backends[self.account_id][self.region] @@ -591,6 +595,10 @@ class Queue(CloudFormationModel): new_messages.append(message) self._messages = new_messages + def wait_for_messages(self, timeout): + with self._messages_lock: + self._messages_lock.wait_for(lambda: self.messages, timeout=timeout) + @classmethod def has_cfn_attr(cls, attr): return attr in ["Arn", "QueueName"] @@ -930,13 +938,11 @@ class SQSBackend(BaseBackend): if previous_result_count == len(result): if wait_seconds_timeout == 0: - # There is timeout and we have added no additional results, + # There is no timeout and no additional results, # so break to avoid an infinite loop. break - import time - - time.sleep(0.01) + queue.wait_for_messages(wait_seconds_timeout) continue previous_result_count = len(result) diff --git a/tests/test_sqs/test_server.py b/tests/test_sqs/test_server.py index d8f068ab5..afdd888fd 100644 --- a/tests/test_sqs/test_server.py +++ b/tests/test_sqs/test_server.py @@ -1,3 +1,4 @@ +import datetime import re import sure # noqa # pylint: disable=unused-import import threading @@ -80,3 +81,19 @@ def test_messages_polling(): # got each message in a separate call to ReceiveMessage, despite the long # WaitTimeSeconds assert len(messages) == 5 + + +def test_no_messages_polling_timeout(): + backend = server.create_backend_app("sqs") + queue_name = "test-queue" + test_client = backend.test_client() + test_client.put(f"/?Action=CreateQueue&QueueName={queue_name}") + wait_seconds = 5 + start = datetime.datetime.utcnow() + test_client.get( + f"/123/{queue_name}?Action=ReceiveMessage&MaxNumberOfMessages=1&WaitTimeSeconds={wait_seconds}" + ) + end = datetime.datetime.utcnow() + duration = end - start + assert duration.seconds >= wait_seconds + assert duration.seconds <= wait_seconds + (wait_seconds / 2)