Merge pull request #2343 from danguer/support-iterator-type-at-after-sequence
Adding support for AT_SEQUENCE_NUMBER and AFTER_SEQUENCE_NUMBER
This commit is contained in:
commit
13c4f2e794
@ -363,7 +363,7 @@ class StreamRecord(BaseModel):
|
|||||||
'dynamodb': {
|
'dynamodb': {
|
||||||
'StreamViewType': stream_type,
|
'StreamViewType': stream_type,
|
||||||
'ApproximateCreationDateTime': datetime.datetime.utcnow().isoformat(),
|
'ApproximateCreationDateTime': datetime.datetime.utcnow().isoformat(),
|
||||||
'SequenceNumber': seq,
|
'SequenceNumber': str(seq),
|
||||||
'SizeBytes': 1,
|
'SizeBytes': 1,
|
||||||
'Keys': keys
|
'Keys': keys
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ class ShardIterator(BaseModel):
|
|||||||
def get(self, limit=1000):
|
def get(self, limit=1000):
|
||||||
items = self.stream_shard.get(self.sequence_number, limit)
|
items = self.stream_shard.get(self.sequence_number, limit)
|
||||||
try:
|
try:
|
||||||
last_sequence_number = max(i['dynamodb']['SequenceNumber'] for i in items)
|
last_sequence_number = max(int(i['dynamodb']['SequenceNumber']) for i in items)
|
||||||
new_shard_iterator = ShardIterator(self.streams_backend,
|
new_shard_iterator = ShardIterator(self.streams_backend,
|
||||||
self.stream_shard,
|
self.stream_shard,
|
||||||
'AFTER_SEQUENCE_NUMBER',
|
'AFTER_SEQUENCE_NUMBER',
|
||||||
|
@ -3,6 +3,7 @@ from __future__ import unicode_literals
|
|||||||
from moto.core.responses import BaseResponse
|
from moto.core.responses import BaseResponse
|
||||||
|
|
||||||
from .models import dynamodbstreams_backends
|
from .models import dynamodbstreams_backends
|
||||||
|
from six import string_types
|
||||||
|
|
||||||
|
|
||||||
class DynamoDBStreamsHandler(BaseResponse):
|
class DynamoDBStreamsHandler(BaseResponse):
|
||||||
@ -23,8 +24,13 @@ class DynamoDBStreamsHandler(BaseResponse):
|
|||||||
arn = self._get_param('StreamArn')
|
arn = self._get_param('StreamArn')
|
||||||
shard_id = self._get_param('ShardId')
|
shard_id = self._get_param('ShardId')
|
||||||
shard_iterator_type = self._get_param('ShardIteratorType')
|
shard_iterator_type = self._get_param('ShardIteratorType')
|
||||||
|
sequence_number = self._get_param('SequenceNumber')
|
||||||
|
# according to documentation sequence_number param should be string
|
||||||
|
if isinstance(sequence_number, string_types):
|
||||||
|
sequence_number = int(sequence_number)
|
||||||
|
|
||||||
return self.backend.get_shard_iterator(arn, shard_id,
|
return self.backend.get_shard_iterator(arn, shard_id,
|
||||||
shard_iterator_type)
|
shard_iterator_type, sequence_number)
|
||||||
|
|
||||||
def get_records(self):
|
def get_records(self):
|
||||||
arn = self._get_param('ShardIterator')
|
arn = self._get_param('ShardIterator')
|
||||||
|
@ -76,6 +76,34 @@ class TestCore():
|
|||||||
ShardIteratorType='TRIM_HORIZON'
|
ShardIteratorType='TRIM_HORIZON'
|
||||||
)
|
)
|
||||||
assert 'ShardIterator' in resp
|
assert 'ShardIterator' in resp
|
||||||
|
|
||||||
|
def test_get_shard_iterator_at_sequence_number(self):
|
||||||
|
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||||
|
|
||||||
|
resp = conn.describe_stream(StreamArn=self.stream_arn)
|
||||||
|
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
|
||||||
|
|
||||||
|
resp = conn.get_shard_iterator(
|
||||||
|
StreamArn=self.stream_arn,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType='AT_SEQUENCE_NUMBER',
|
||||||
|
SequenceNumber=resp['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber']
|
||||||
|
)
|
||||||
|
assert 'ShardIterator' in resp
|
||||||
|
|
||||||
|
def test_get_shard_iterator_after_sequence_number(self):
|
||||||
|
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||||
|
|
||||||
|
resp = conn.describe_stream(StreamArn=self.stream_arn)
|
||||||
|
shard_id = resp['StreamDescription']['Shards'][0]['ShardId']
|
||||||
|
|
||||||
|
resp = conn.get_shard_iterator(
|
||||||
|
StreamArn=self.stream_arn,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType='AFTER_SEQUENCE_NUMBER',
|
||||||
|
SequenceNumber=resp['StreamDescription']['Shards'][0]['SequenceNumberRange']['StartingSequenceNumber']
|
||||||
|
)
|
||||||
|
assert 'ShardIterator' in resp
|
||||||
|
|
||||||
def test_get_records_empty(self):
|
def test_get_records_empty(self):
|
||||||
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
conn = boto3.client('dynamodbstreams', region_name='us-east-1')
|
||||||
@ -135,11 +163,39 @@ class TestCore():
|
|||||||
assert resp['Records'][1]['eventName'] == 'MODIFY'
|
assert resp['Records'][1]['eventName'] == 'MODIFY'
|
||||||
assert resp['Records'][2]['eventName'] == 'DELETE'
|
assert resp['Records'][2]['eventName'] == 'DELETE'
|
||||||
|
|
||||||
|
sequence_number_modify = resp['Records'][1]['dynamodb']['SequenceNumber']
|
||||||
|
|
||||||
# now try fetching from the next shard iterator, it should be
|
# now try fetching from the next shard iterator, it should be
|
||||||
# empty
|
# empty
|
||||||
resp = conn.get_records(ShardIterator=resp['NextShardIterator'])
|
resp = conn.get_records(ShardIterator=resp['NextShardIterator'])
|
||||||
assert len(resp['Records']) == 0
|
assert len(resp['Records']) == 0
|
||||||
|
|
||||||
|
# check that if we get the shard iterator AT_SEQUENCE_NUMBER will get the MODIFY event
|
||||||
|
resp = conn.get_shard_iterator(
|
||||||
|
StreamArn=self.stream_arn,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType='AT_SEQUENCE_NUMBER',
|
||||||
|
SequenceNumber=sequence_number_modify
|
||||||
|
)
|
||||||
|
iterator_id = resp['ShardIterator']
|
||||||
|
resp = conn.get_records(ShardIterator=iterator_id)
|
||||||
|
assert len(resp['Records']) == 2
|
||||||
|
assert resp['Records'][0]['eventName'] == 'MODIFY'
|
||||||
|
assert resp['Records'][1]['eventName'] == 'DELETE'
|
||||||
|
|
||||||
|
# check that if we get the shard iterator AFTER_SEQUENCE_NUMBER will get the DELETE event
|
||||||
|
resp = conn.get_shard_iterator(
|
||||||
|
StreamArn=self.stream_arn,
|
||||||
|
ShardId=shard_id,
|
||||||
|
ShardIteratorType='AFTER_SEQUENCE_NUMBER',
|
||||||
|
SequenceNumber=sequence_number_modify
|
||||||
|
)
|
||||||
|
iterator_id = resp['ShardIterator']
|
||||||
|
resp = conn.get_records(ShardIterator=iterator_id)
|
||||||
|
assert len(resp['Records']) == 1
|
||||||
|
assert resp['Records'][0]['eventName'] == 'DELETE'
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TestEdges():
|
class TestEdges():
|
||||||
mocks = []
|
mocks = []
|
||||||
|
Loading…
Reference in New Issue
Block a user