From 7091be8eaec81a6044649f811341af4691de7184 Mon Sep 17 00:00:00 2001 From: Daniel Guerrero Date: Mon, 29 Jul 2019 21:13:58 -0500 Subject: [PATCH 1/5] Adding support for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER Adding support on DynamoDB Streams for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER ShardIteratorType Change SequenceNumber type to string instead of int to match documentation --- moto/dynamodb2/models.py | 2 +- moto/dynamodbstreams/models.py | 2 +- moto/dynamodbstreams/responses.py | 7 ++- .../test_dynamodbstreams.py | 56 +++++++++++++++++++ 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/moto/dynamodb2/models.py b/moto/dynamodb2/models.py index e868caaa8..4ef4461cd 100644 --- a/moto/dynamodb2/models.py +++ b/moto/dynamodb2/models.py @@ -363,7 +363,7 @@ class StreamRecord(BaseModel): 'dynamodb': { 'StreamViewType': stream_type, 'ApproximateCreationDateTime': datetime.datetime.utcnow().isoformat(), - 'SequenceNumber': seq, + 'SequenceNumber': str(seq), 'SizeBytes': 1, 'Keys': keys } diff --git a/moto/dynamodbstreams/models.py b/moto/dynamodbstreams/models.py index 41cc6e280..3e20ae13f 100644 --- a/moto/dynamodbstreams/models.py +++ b/moto/dynamodbstreams/models.py @@ -39,7 +39,7 @@ class ShardIterator(BaseModel): def get(self, limit=1000): items = self.stream_shard.get(self.sequence_number, limit) try: - last_sequence_number = max(i['dynamodb']['SequenceNumber'] for i in items) + last_sequence_number = max(int(i['dynamodb']['SequenceNumber']) for i in items) new_shard_iterator = ShardIterator(self.streams_backend, self.stream_shard, 'AFTER_SEQUENCE_NUMBER', diff --git a/moto/dynamodbstreams/responses.py b/moto/dynamodbstreams/responses.py index c9c113615..0e2800f55 100644 --- a/moto/dynamodbstreams/responses.py +++ b/moto/dynamodbstreams/responses.py @@ -23,8 +23,13 @@ class DynamoDBStreamsHandler(BaseResponse): arn = self._get_param('StreamArn') shard_id = self._get_param('ShardId') shard_iterator_type = self._get_param('ShardIteratorType') + sequence_number = self._get_param('SequenceNumber') + #according to documentation sequence_number param should be string + if isinstance(sequence_number, str): + sequence_number = int(sequence_number) + return self.backend.get_shard_iterator(arn, shard_id, - shard_iterator_type) + shard_iterator_type, sequence_number) def get_records(self): arn = self._get_param('ShardIterator') diff --git a/tests/test_dynamodbstreams/test_dynamodbstreams.py b/tests/test_dynamodbstreams/test_dynamodbstreams.py index b60c21053..f1c59fa29 100644 --- a/tests/test_dynamodbstreams/test_dynamodbstreams.py +++ b/tests/test_dynamodbstreams/test_dynamodbstreams.py @@ -76,6 +76,34 @@ class TestCore(): ShardIteratorType='TRIM_HORIZON' ) assert 'ShardIterator' in resp + + def test_get_shard_iterator_at_sequence_number(self): + conn = boto3.client('dynamodbstreams', region_name='us-east-1') + + resp = conn.describe_stream(StreamArn=self.stream_arn) + shard_id = resp['StreamDescription']['Shards'][0]['ShardId'] + + resp = conn.get_shard_iterator( + StreamArn=self.stream_arn, + ShardId=shard_id, + ShardIteratorType='AT_SEQUENCE_NUMBER', + SequenceNumber=resp['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'] + ) + assert 'ShardIterator' in resp + + def test_get_shard_iterator_after_sequence_number(self): + conn = boto3.client('dynamodbstreams', region_name='us-east-1') + + resp = conn.describe_stream(StreamArn=self.stream_arn) + shard_id = resp['StreamDescription']['Shards'][0]['ShardId'] + + resp = conn.get_shard_iterator( + StreamArn=self.stream_arn, + ShardId=shard_id, + ShardIteratorType='AFTER_SEQUENCE_NUMBER', + SequenceNumber=resp['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber'] + ) + assert 'ShardIterator' in resp def test_get_records_empty(self): conn = boto3.client('dynamodbstreams', region_name='us-east-1') @@ -135,11 +163,39 @@ class TestCore(): assert resp['Records'][1]['eventName'] == 'MODIFY' assert resp['Records'][2]['eventName'] == 'DELETE' + sequence_number_modify = resp['Records'][1]['dynamodb']['SequenceNumber'] + # now try fetching from the next shard iterator, it should be # empty resp = conn.get_records(ShardIterator=resp['NextShardIterator']) assert len(resp['Records']) == 0 + #check that if we get the shard iterator AT_SEQUENCE_NUMBER will get the MODIFY event + resp = conn.get_shard_iterator( + StreamArn=self.stream_arn, + ShardId=shard_id, + ShardIteratorType='AT_SEQUENCE_NUMBER', + SequenceNumber=sequence_number_modify + ) + iterator_id = resp['ShardIterator'] + resp = conn.get_records(ShardIterator=iterator_id) + assert len(resp['Records']) == 2 + assert resp['Records'][0]['eventName'] == 'MODIFY' + assert resp['Records'][1]['eventName'] == 'DELETE' + + #check that if we get the shard iterator AFTER_SEQUENCE_NUMBER will get the DELETE event + resp = conn.get_shard_iterator( + StreamArn=self.stream_arn, + ShardId=shard_id, + ShardIteratorType='AFTER_SEQUENCE_NUMBER', + SequenceNumber=sequence_number_modify + ) + iterator_id = resp['ShardIterator'] + resp = conn.get_records(ShardIterator=iterator_id) + assert len(resp['Records']) == 1 + assert resp['Records'][0]['eventName'] == 'DELETE' + + class TestEdges(): mocks = [] From bfc401c520b38f60adb29d5aba09bcead2da13c0 Mon Sep 17 00:00:00 2001 From: Daniel Guerrero Date: Mon, 29 Jul 2019 21:21:02 -0500 Subject: [PATCH 2/5] Fixing comments conventions --- moto/dynamodbstreams/responses.py | 2 +- tests/test_dynamodbstreams/test_dynamodbstreams.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/moto/dynamodbstreams/responses.py b/moto/dynamodbstreams/responses.py index 0e2800f55..c4e61a750 100644 --- a/moto/dynamodbstreams/responses.py +++ b/moto/dynamodbstreams/responses.py @@ -24,7 +24,7 @@ class DynamoDBStreamsHandler(BaseResponse): shard_id = self._get_param('ShardId') shard_iterator_type = self._get_param('ShardIteratorType') sequence_number = self._get_param('SequenceNumber') - #according to documentation sequence_number param should be string + # according to documentation sequence_number param should be string if isinstance(sequence_number, str): sequence_number = int(sequence_number) diff --git a/tests/test_dynamodbstreams/test_dynamodbstreams.py b/tests/test_dynamodbstreams/test_dynamodbstreams.py index f1c59fa29..deb9f9283 100644 --- a/tests/test_dynamodbstreams/test_dynamodbstreams.py +++ b/tests/test_dynamodbstreams/test_dynamodbstreams.py @@ -170,7 +170,7 @@ class TestCore(): resp = conn.get_records(ShardIterator=resp['NextShardIterator']) assert len(resp['Records']) == 0 - #check that if we get the shard iterator AT_SEQUENCE_NUMBER will get the MODIFY event + # check that if we get the shard iterator AT_SEQUENCE_NUMBER will get the MODIFY event resp = conn.get_shard_iterator( StreamArn=self.stream_arn, ShardId=shard_id, @@ -183,7 +183,7 @@ class TestCore(): assert resp['Records'][0]['eventName'] == 'MODIFY' assert resp['Records'][1]['eventName'] == 'DELETE' - #check that if we get the shard iterator AFTER_SEQUENCE_NUMBER will get the DELETE event + # check that if we get the shard iterator AFTER_SEQUENCE_NUMBER will get the DELETE event resp = conn.get_shard_iterator( StreamArn=self.stream_arn, ShardId=shard_id, From 364bd0720d6e3f8b93bc3416927ad37f48911691 Mon Sep 17 00:00:00 2001 From: Daniel Guerrero Date: Tue, 30 Jul 2019 13:54:42 -0500 Subject: [PATCH 3/5] Adding support for python 2.7 Python 2.7 sends unicode type instead string type --- moto/dynamodbstreams/responses.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moto/dynamodbstreams/responses.py b/moto/dynamodbstreams/responses.py index c4e61a750..6ff6ba2f4 100644 --- a/moto/dynamodbstreams/responses.py +++ b/moto/dynamodbstreams/responses.py @@ -25,7 +25,7 @@ class DynamoDBStreamsHandler(BaseResponse): shard_iterator_type = self._get_param('ShardIteratorType') sequence_number = self._get_param('SequenceNumber') # according to documentation sequence_number param should be string - if isinstance(sequence_number, str): + if isinstance(sequence_number, str) or isinstance(sequence_number, unicode): sequence_number = int(sequence_number) return self.backend.get_shard_iterator(arn, shard_id, From 1ce162f0561f10e20c924dbcf0ae2cd27ec78285 Mon Sep 17 00:00:00 2001 From: Daniel Guerrero Date: Tue, 30 Jul 2019 14:15:47 -0500 Subject: [PATCH 4/5] Using string class to detect type Using string class instead unicode that has been removed from python 3 --- moto/dynamodbstreams/responses.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/moto/dynamodbstreams/responses.py b/moto/dynamodbstreams/responses.py index 6ff6ba2f4..c570483c5 100644 --- a/moto/dynamodbstreams/responses.py +++ b/moto/dynamodbstreams/responses.py @@ -25,7 +25,7 @@ class DynamoDBStreamsHandler(BaseResponse): shard_iterator_type = self._get_param('ShardIteratorType') sequence_number = self._get_param('SequenceNumber') # according to documentation sequence_number param should be string - if isinstance(sequence_number, str) or isinstance(sequence_number, unicode): + if isinstance(sequence_number, "".__class__): sequence_number = int(sequence_number) return self.backend.get_shard_iterator(arn, shard_id, From 4d2b12f40da355e55fba91df14a9d11d6570c27f Mon Sep 17 00:00:00 2001 From: Daniel Guerrero Date: Tue, 27 Aug 2019 19:59:43 -0500 Subject: [PATCH 5/5] Adding six.string_types checking --- moto/dynamodbstreams/responses.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/moto/dynamodbstreams/responses.py b/moto/dynamodbstreams/responses.py index c570483c5..7774f3239 100644 --- a/moto/dynamodbstreams/responses.py +++ b/moto/dynamodbstreams/responses.py @@ -3,6 +3,7 @@ from __future__ import unicode_literals from moto.core.responses import BaseResponse from .models import dynamodbstreams_backends +from six import string_types class DynamoDBStreamsHandler(BaseResponse): @@ -25,7 +26,7 @@ class DynamoDBStreamsHandler(BaseResponse): shard_iterator_type = self._get_param('ShardIteratorType') sequence_number = self._get_param('SequenceNumber') # according to documentation sequence_number param should be string - if isinstance(sequence_number, "".__class__): + if isinstance(sequence_number, string_types): sequence_number = int(sequence_number) return self.backend.get_shard_iterator(arn, shard_id,