Add fifo high throughput (#4224)

This commit is contained in:
Anton Grübel 2021-08-27 00:23:17 +09:00 committed by GitHub
parent 21021a6a03
commit e865362791
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 193 additions and 58 deletions

View File

@ -6,7 +6,7 @@ class SNSNotFoundError(RESTError):
code = 404
def __init__(self, message, **kwargs):
super(SNSNotFoundError, self).__init__("NotFound", message, **kwargs)
super().__init__("NotFound", message, **kwargs)
class ResourceNotFoundError(RESTError):

View File

@ -365,7 +365,7 @@ class SNSBackend(BaseBackend):
def __init__(self, region_name):
super(SNSBackend, self).__init__()
self.topics = OrderedDict()
self.subscriptions = OrderedDict()
self.subscriptions: OrderedDict[str, Subscription] = OrderedDict()
self.applications = {}
self.platform_endpoints = {}
self.region_name = region_name
@ -627,10 +627,12 @@ class SNSBackend(BaseBackend):
raise SNSNotFoundError("Endpoint with arn {0} not found".format(arn))
def get_subscription_attributes(self, arn):
_subscription = [_ for _ in self.subscriptions.values() if _.arn == arn]
if not _subscription:
raise SNSNotFoundError("Subscription with arn {0} not found".format(arn))
subscription = _subscription[0]
subscription = self.subscriptions.get(arn)
if not subscription:
raise SNSNotFoundError(
"Subscription does not exist", template="wrapped_single_error"
)
return subscription.attributes

View File

@ -26,12 +26,13 @@ class MessageAttributesInvalid(RESTError):
class QueueDoesNotExist(RESTError):
code = 404
code = 400
def __init__(self):
super(QueueDoesNotExist, self).__init__(
"QueueDoesNotExist",
super().__init__(
"AWS.SimpleQueueService.NonExistentQueue",
"The specified queue does not exist for this wsdl version.",
template="wrapped_single_error",
)

View File

@ -9,6 +9,7 @@ import string
import struct
from copy import deepcopy
from typing import Dict
from xml.sax.saxutils import escape
from boto3 import Session
@ -221,7 +222,12 @@ class Queue(CloudFormationModel):
"ReceiveMessageWaitTimeSeconds",
"VisibilityTimeout",
]
FIFO_ATTRIBUTES = ["FifoQueue", "ContentBasedDeduplication"]
FIFO_ATTRIBUTES = [
"ContentBasedDeduplication",
"DeduplicationScope",
"FifoQueue",
"FifoThroughputLimit",
]
KMS_ATTRIBUTES = ["KmsDataKeyReusePeriodSeconds", "KmsMasterKeyId"]
ALLOWED_PERMISSIONS = (
"*",
@ -256,8 +262,10 @@ class Queue(CloudFormationModel):
# default settings for a non fifo queue
defaults = {
"ContentBasedDeduplication": "false",
"DeduplicationScope": "queue",
"DelaySeconds": 0,
"FifoQueue": "false",
"FifoThroughputLimit": "perQueue",
"KmsDataKeyReusePeriodSeconds": 300, # five minutes
"KmsMasterKeyId": None,
"MaximumMessageSize": MAXIMUM_MESSAGE_LENGTH,
@ -567,7 +575,7 @@ def _filter_message_attributes(message, input_message_attributes):
class SQSBackend(BaseBackend):
def __init__(self, region_name):
self.region_name = region_name
self.queues = {}
self.queues: Dict[str, Queue] = {}
super(SQSBackend, self).__init__()
def reset(self):
@ -628,16 +636,13 @@ class SQSBackend(BaseBackend):
return queue
def delete_queue(self, queue_name):
if queue_name in self.queues:
return self.queues.pop(queue_name)
return False
self.get_queue(queue_name)
del self.queues[queue_name]
def get_queue_attributes(self, queue_name, attribute_names):
queue = self.get_queue(queue_name)
if not len(attribute_names):
attribute_names.append("All")
valid_names = (
["All"]
+ queue.BASE_ATTRIBUTES
@ -1034,7 +1039,9 @@ class SQSBackend(BaseBackend):
return self.get_queue(queue_name)
def is_message_valid_based_on_retention_period(self, queue_name, message):
message_attributes = self.get_queue_attributes(queue_name, [])
message_attributes = self.get_queue_attributes(
queue_name, ["MessageRetentionPeriod"]
)
retain_until = (
message_attributes.get("MessageRetentionPeriod")
+ message.sent_timestamp / 1000

View File

@ -202,6 +202,10 @@ class SQSResponse(BaseResponse):
attribute_names = self._get_multi_param("AttributeName")
# if connecting to AWS via boto, then 'AttributeName' is just a normal parameter
if not attribute_names:
attribute_names = self.querystring.get("AttributeName")
attributes = self.sqs_backend.get_queue_attributes(queue_name, attribute_names)
template = self.response_template(GET_QUEUE_ATTRIBUTES_RESPONSE)
@ -226,15 +230,11 @@ class SQSResponse(BaseResponse):
def delete_queue(self):
# TODO validate self.get_param('QueueUrl')
queue_name = self._get_queue_name()
queue = self.sqs_backend.delete_queue(queue_name)
if not queue:
return (
"A queue with name {0} does not exist".format(queue_name),
dict(status=404),
)
self.sqs_backend.delete_queue(queue_name)
template = self.response_template(DELETE_QUEUE_RESPONSE)
return template.render(queue=queue)
return template.render()
def send_message(self):
message = self._get_param("MessageBody")

View File

@ -71,6 +71,7 @@ TestAccAWSRedshiftServiceAccount
TestAccAWSRolePolicyAttachment
TestAccAWSSNSSMSPreferences
TestAccAWSSageMakerPrebuiltECRImage
TestAccAWSSQSQueuePolicy
TestAccAWSSsmParameterDataSource
TestAccAWSUserGroupMembership
TestAccAWSUserPolicyAttachment

View File

@ -539,3 +539,21 @@ def test_confirm_subscription():
Token="2336412f37fb687f5d51e6e241d59b68c4e583a5cee0be6f95bbf97ab8d2441cf47b99e848408adaadf4c197e65f03473d53c4ba398f6abbf38ce2e8ebf7b4ceceb2cd817959bcde1357e58a2861b05288c535822eb88cac3db04f592285249971efc6484194fc4a4586147f16916692",
AuthenticateOnUnsubscribe="true",
)
@mock_sns
def test_get_subscription_attributes_error_not_exists():
# given
client = boto3.client("sns", region_name="us-east-1")
sub_arn = f"arn:aws:sqs:us-east-1:{DEFAULT_ACCOUNT_ID}:test-queue:66d97e76-31e5-444f-8fa7-b60b680d0d39"
# when
with pytest.raises(ClientError) as e:
client.get_subscription_attributes(SubscriptionArn=sub_arn)
# then
ex = e.value
ex.operation_name.should.equal("GetSubscriptionAttributes")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(404)
ex.response["Error"]["Code"].should.contain("NotFound")
ex.response["Error"]["Message"].should.equal("Subscription does not exist")

View File

@ -66,7 +66,9 @@ def test_create_queue_with_same_attributes():
sqs = boto3.client("sqs", region_name="us-east-1")
dlq_url = sqs.create_queue(QueueName="test-queue-dlq")["QueueUrl"]
dlq_arn = sqs.get_queue_attributes(QueueUrl=dlq_url)["Attributes"]["QueueArn"]
dlq_arn = sqs.get_queue_attributes(QueueUrl=dlq_url, AttributeNames=["All"])[
"Attributes"
]["QueueArn"]
attributes = {
"DelaySeconds": "900",
@ -110,15 +112,66 @@ def test_create_queue_with_different_attributes_fail():
@mock_sqs
def test_create_fifo_queue():
sqs = boto3.client("sqs", region_name="us-east-1")
resp = sqs.create_queue(
QueueName="test-queue.fifo", Attributes={"FifoQueue": "true"}
)
queue_url = resp["QueueUrl"]
# given
region_name = "us-east-1"
sqs = boto3.client("sqs", region_name=region_name)
queue_name = "test-queue.fifo"
response = sqs.get_queue_attributes(QueueUrl=queue_url)
response["Attributes"].should.contain("FifoQueue")
response["Attributes"]["FifoQueue"].should.equal("true")
# when
queue_url = sqs.create_queue(
QueueName=queue_name, Attributes={"FifoQueue": "true"}
)["QueueUrl"]
# then
queue_url.should.contain(queue_name)
attributes = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[
"Attributes"
]
attributes["ApproximateNumberOfMessages"].should.equal("0")
attributes["ApproximateNumberOfMessagesNotVisible"].should.equal("0")
attributes["ApproximateNumberOfMessagesDelayed"].should.equal("0")
attributes["CreatedTimestamp"].should.be.a(str)
attributes["ContentBasedDeduplication"].should.equal("false")
attributes["DeduplicationScope"].should.equal("queue")
attributes["DelaySeconds"].should.equal("0")
attributes["LastModifiedTimestamp"].should.be.a(str)
attributes["FifoQueue"].should.equal("true")
attributes["FifoThroughputLimit"].should.equal("perQueue")
attributes["MaximumMessageSize"].should.equal("262144")
attributes["MessageRetentionPeriod"].should.equal("345600")
attributes["QueueArn"].should.equal(
f"arn:aws:sqs:{region_name}:{ACCOUNT_ID}:{queue_name}"
)
attributes["ReceiveMessageWaitTimeSeconds"].should.equal("0")
attributes["VisibilityTimeout"].should.equal("30")
@mock_sqs
def test_create_fifo_queue_with_high_throughput():
# given
sqs = boto3.client("sqs", region_name="us-east-1")
queue_name = "test-queue.fifo"
# when
queue_url = sqs.create_queue(
QueueName=queue_name,
Attributes={
"FifoQueue": "true",
"DeduplicationScope": "messageGroup",
"FifoThroughputLimit": "perMessageGroupId",
},
)["QueueUrl"]
# then
queue_url.should.contain(queue_name)
attributes = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])[
"Attributes"
]
attributes["DeduplicationScope"].should.equal("messageGroup")
attributes["FifoQueue"].should.equal("true")
attributes["FifoThroughputLimit"].should.equal("perMessageGroupId")
@mock_sqs
@ -235,12 +288,23 @@ def test_get_queue_url():
@mock_sqs
def test_get_queue_url_errors():
def test_get_queue_url_error_not_exists():
# given
client = boto3.client("sqs", region_name="us-east-1")
client.get_queue_url.when.called_with(QueueName="non-existing-queue").should.throw(
ClientError,
"The specified queue non-existing-queue does not exist for this wsdl version.",
# when
with pytest.raises(ClientError) as e:
client.get_queue_url(QueueName="not-exists")
# then
ex = e.value
ex.operation_name.should.equal("GetQueueUrl")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain(
"AWS.SimpleQueueService.NonExistentQueue"
)
ex.response["Error"]["Message"].should.equal(
"The specified queue does not exist for this wsdl version."
)
@ -253,7 +317,7 @@ def test_get_nonexistent_queue():
ex.operation_name.should.equal("GetQueueUrl")
ex.response["Error"]["Code"].should.equal("AWS.SimpleQueueService.NonExistentQueue")
ex.response["Error"]["Message"].should.equal(
"The specified queue non-existing-queue does not exist for this wsdl version."
"The specified queue does not exist for this wsdl version."
)
with pytest.raises(ClientError) as err:
@ -577,8 +641,25 @@ def test_delete_queue():
queue.delete()
conn.list_queues().get("QueueUrls").should.equal(None)
with pytest.raises(botocore.exceptions.ClientError):
queue.delete()
@mock_sqs
def test_delete_queue_error_not_exists():
client = boto3.client("sqs", region_name="us-east-1")
with pytest.raises(ClientError) as e:
client.delete_queue(
QueueUrl=f"https://queue.amazonaws.com/{ACCOUNT_ID}/not-exists"
)
ex = e.value
ex.operation_name.should.equal("DeleteQueue")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain(
"AWS.SimpleQueueService.NonExistentQueue"
)
ex.response["Error"]["Message"].should.equal(
"The specified queue does not exist for this wsdl version."
)
@mock_sqs
@ -586,9 +667,9 @@ def test_get_queue_attributes():
client = boto3.client("sqs", region_name="us-east-1")
dlq_resp = client.create_queue(QueueName="test-dlr-queue")
dlq_arn1 = client.get_queue_attributes(QueueUrl=dlq_resp["QueueUrl"])["Attributes"][
"QueueArn"
]
dlq_arn1 = client.get_queue_attributes(
QueueUrl=dlq_resp["QueueUrl"], AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
response = client.create_queue(
QueueName="test-queue",
@ -600,7 +681,7 @@ def test_get_queue_attributes():
)
queue_url = response["QueueUrl"]
response = client.get_queue_attributes(QueueUrl=queue_url)
response = client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["All"])
response["Attributes"]["ApproximateNumberOfMessages"].should.equal("0")
response["Attributes"]["ApproximateNumberOfMessagesDelayed"].should.equal("0")
@ -653,12 +734,6 @@ def test_get_queue_attributes_errors():
response = client.create_queue(QueueName="test-queue")
queue_url = response["QueueUrl"]
client.get_queue_attributes.when.called_with(
QueueUrl=queue_url + "-non-existing"
).should.throw(
ClientError, "The specified queue does not exist for this wsdl version."
)
client.get_queue_attributes.when.called_with(
QueueUrl=queue_url,
AttributeNames=["QueueArn", "not-existing", "VisibilityTimeout"],
@ -673,6 +748,29 @@ def test_get_queue_attributes_errors():
).should.throw(ClientError, "Unknown Attribute .")
@mock_sqs
def test_get_queue_attributes_error_not_exists():
# given
client = boto3.client("sqs", region_name="us-east-1")
# when
with pytest.raises(ClientError) as e:
client.get_queue_attributes(
QueueUrl=f"https://queue.amazonaws.com/{ACCOUNT_ID}/not-exists"
)
# then
ex = e.value
ex.operation_name.should.equal("GetQueueAttributes")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.response["Error"]["Code"].should.contain(
"AWS.SimpleQueueService.NonExistentQueue"
)
ex.response["Error"]["Message"].should.equal(
"The specified queue does not exist for this wsdl version."
)
@mock_sqs
def test_set_queue_attribute():
sqs = boto3.resource("sqs", region_name="us-east-1")
@ -2289,13 +2387,17 @@ def test_create_fifo_queue_with_dlq():
QueueName="test-dlr-queue.fifo", Attributes={"FifoQueue": "true"}
)
queue_url1 = resp["QueueUrl"]
queue_arn1 = sqs.get_queue_attributes(QueueUrl=queue_url1)["Attributes"]["QueueArn"]
queue_arn1 = sqs.get_queue_attributes(
QueueUrl=queue_url1, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
resp = sqs.create_queue(
QueueName="test-dlr-queue", Attributes={"FifoQueue": "false"}
)
queue_url2 = resp["QueueUrl"]
queue_arn2 = sqs.get_queue_attributes(QueueUrl=queue_url2)["Attributes"]["QueueArn"]
queue_arn2 = sqs.get_queue_attributes(
QueueUrl=queue_url2, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
sqs.create_queue(
QueueName="test-queue.fifo",
@ -2332,9 +2434,9 @@ def test_queue_with_dlq():
QueueName="test-dlr-queue.fifo", Attributes={"FifoQueue": "true"}
)
queue_url1 = resp["QueueUrl"]
queue_arn1 = sqs.get_queue_attributes(QueueUrl=queue_url1)["Attributes"][
"QueueArn"
]
queue_arn1 = sqs.get_queue_attributes(
QueueUrl=queue_url1, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
resp = sqs.create_queue(
QueueName="test-queue.fifo",
@ -2390,7 +2492,9 @@ def test_redrive_policy_available():
resp = sqs.create_queue(QueueName="test-deadletter")
queue_url1 = resp["QueueUrl"]
queue_arn1 = sqs.get_queue_attributes(QueueUrl=queue_url1)["Attributes"]["QueueArn"]
queue_arn1 = sqs.get_queue_attributes(
QueueUrl=queue_url1, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
redrive_policy = {"deadLetterTargetArn": queue_arn1, "maxReceiveCount": 1}
resp = sqs.create_queue(
@ -2398,7 +2502,9 @@ def test_redrive_policy_available():
)
queue_url2 = resp["QueueUrl"]
attributes = sqs.get_queue_attributes(QueueUrl=queue_url2)["Attributes"]
attributes = sqs.get_queue_attributes(
QueueUrl=queue_url2, AttributeNames=["RedrivePolicy"]
)["Attributes"]
assert "RedrivePolicy" in attributes
assert json.loads(attributes["RedrivePolicy"]) == redrive_policy