Add ApproximateArrivalTimestamp and MillisBehindLatest to Kinesis get_records response (#1715)
* Add ApproximateArrivalTimestamp to Kinesis response * Add MillisBehindLatest to Kinesis get_records response
This commit is contained in:
parent
bb6da93891
commit
46dd351965
@ -19,19 +19,20 @@ from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose
|
||||
|
||||
|
||||
class Record(BaseModel):
|
||||
|
||||
def __init__(self, partition_key, data, sequence_number, explicit_hash_key):
|
||||
self.partition_key = partition_key
|
||||
self.data = data
|
||||
self.sequence_number = sequence_number
|
||||
self.explicit_hash_key = explicit_hash_key
|
||||
self.create_at = unix_time()
|
||||
self.created_at_datetime = datetime.datetime.utcnow()
|
||||
self.created_at = unix_time(self.created_at_datetime)
|
||||
|
||||
def to_json(self):
|
||||
return {
|
||||
"Data": self.data,
|
||||
"PartitionKey": self.partition_key,
|
||||
"SequenceNumber": str(self.sequence_number),
|
||||
"ApproximateArrivalTimestamp": self.created_at_datetime.isoformat()
|
||||
}
|
||||
|
||||
|
||||
@ -50,16 +51,21 @@ class Shard(BaseModel):
|
||||
def get_records(self, last_sequence_id, limit):
|
||||
last_sequence_id = int(last_sequence_id)
|
||||
results = []
|
||||
secs_behind_latest = 0
|
||||
|
||||
for sequence_number, record in self.records.items():
|
||||
if sequence_number > last_sequence_id:
|
||||
results.append(record)
|
||||
last_sequence_id = sequence_number
|
||||
|
||||
very_last_record = self.records[next(reversed(self.records))]
|
||||
secs_behind_latest = very_last_record.created_at - record.created_at
|
||||
|
||||
if len(results) == limit:
|
||||
break
|
||||
|
||||
return results, last_sequence_id
|
||||
millis_behind_latest = int(secs_behind_latest * 1000)
|
||||
return results, last_sequence_id, millis_behind_latest
|
||||
|
||||
def put_record(self, partition_key, data, explicit_hash_key):
|
||||
# Note: this function is not safe for concurrency
|
||||
@ -83,12 +89,12 @@ class Shard(BaseModel):
|
||||
return 0
|
||||
|
||||
def get_sequence_number_at(self, at_timestamp):
|
||||
if not self.records or at_timestamp < list(self.records.values())[0].create_at:
|
||||
if not self.records or at_timestamp < list(self.records.values())[0].created_at:
|
||||
return 0
|
||||
else:
|
||||
# find the last item in the list that was created before
|
||||
# at_timestamp
|
||||
r = next((r for r in reversed(self.records.values()) if r.create_at < at_timestamp), None)
|
||||
r = next((r for r in reversed(self.records.values()) if r.created_at < at_timestamp), None)
|
||||
return r.sequence_number
|
||||
|
||||
def to_json(self):
|
||||
@ -226,7 +232,7 @@ class DeliveryStream(BaseModel):
|
||||
|
||||
self.records = []
|
||||
self.status = 'ACTIVE'
|
||||
self.create_at = datetime.datetime.utcnow()
|
||||
self.created_at = datetime.datetime.utcnow()
|
||||
self.last_updated = datetime.datetime.utcnow()
|
||||
|
||||
@property
|
||||
@ -267,7 +273,7 @@ class DeliveryStream(BaseModel):
|
||||
def to_dict(self):
|
||||
return {
|
||||
"DeliveryStreamDescription": {
|
||||
"CreateTimestamp": time.mktime(self.create_at.timetuple()),
|
||||
"CreateTimestamp": time.mktime(self.created_at.timetuple()),
|
||||
"DeliveryStreamARN": self.arn,
|
||||
"DeliveryStreamName": self.name,
|
||||
"DeliveryStreamStatus": self.status,
|
||||
@ -329,12 +335,12 @@ class KinesisBackend(BaseBackend):
|
||||
stream = self.describe_stream(stream_name)
|
||||
shard = stream.get_shard(shard_id)
|
||||
|
||||
records, last_sequence_id = shard.get_records(last_sequence_id, limit)
|
||||
records, last_sequence_id, millis_behind_latest = shard.get_records(last_sequence_id, limit)
|
||||
|
||||
next_shard_iterator = compose_shard_iterator(
|
||||
stream_name, shard, last_sequence_id)
|
||||
|
||||
return next_shard_iterator, records
|
||||
return next_shard_iterator, records, millis_behind_latest
|
||||
|
||||
def put_record(self, stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data):
|
||||
stream = self.describe_stream(stream_name)
|
||||
|
@ -80,12 +80,13 @@ class KinesisResponse(BaseResponse):
|
||||
shard_iterator = self.parameters.get("ShardIterator")
|
||||
limit = self.parameters.get("Limit")
|
||||
|
||||
next_shard_iterator, records = self.kinesis_backend.get_records(
|
||||
next_shard_iterator, records, millis_behind_latest = self.kinesis_backend.get_records(
|
||||
shard_iterator, limit)
|
||||
|
||||
return json.dumps({
|
||||
"NextShardIterator": next_shard_iterator,
|
||||
"Records": [record.to_json() for record in records]
|
||||
"Records": [record.to_json() for record in records],
|
||||
'MillisBehindLatest': millis_behind_latest
|
||||
})
|
||||
|
||||
def put_record(self):
|
||||
|
@ -89,6 +89,7 @@ def test_basic_shard_iterator():
|
||||
response = conn.get_records(shard_iterator)
|
||||
shard_iterator = response['NextShardIterator']
|
||||
response['Records'].should.equal([])
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis_deprecated
|
||||
@ -225,6 +226,7 @@ def test_get_records_after_sequence_number():
|
||||
response = conn.get_records(shard_iterator)
|
||||
# And the first result returned should be the third item
|
||||
response['Records'][0]['Data'].should.equal('3')
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis_deprecated
|
||||
@ -262,6 +264,7 @@ def test_get_records_latest():
|
||||
response['Records'].should.have.length_of(1)
|
||||
response['Records'][0]['PartitionKey'].should.equal('last_record')
|
||||
response['Records'][0]['Data'].should.equal('last_record')
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
@ -305,6 +308,7 @@ def test_get_records_at_timestamp():
|
||||
response['Records'].should.have.length_of(len(keys))
|
||||
partition_keys = [r['PartitionKey'] for r in response['Records']]
|
||||
partition_keys.should.equal(keys)
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
@ -330,10 +334,69 @@ def test_get_records_at_very_old_timestamp():
|
||||
shard_iterator = response['ShardIterator']
|
||||
|
||||
response = conn.get_records(ShardIterator=shard_iterator)
|
||||
|
||||
response['Records'].should.have.length_of(len(keys))
|
||||
partition_keys = [r['PartitionKey'] for r in response['Records']]
|
||||
partition_keys.should.equal(keys)
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_get_records_timestamp_filtering():
|
||||
conn = boto3.client('kinesis', region_name="us-west-2")
|
||||
stream_name = "my_stream"
|
||||
conn.create_stream(StreamName=stream_name, ShardCount=1)
|
||||
|
||||
conn.put_record(StreamName=stream_name,
|
||||
Data='0',
|
||||
PartitionKey='0')
|
||||
|
||||
time.sleep(1.0)
|
||||
timestamp = datetime.datetime.utcnow()
|
||||
|
||||
conn.put_record(StreamName=stream_name,
|
||||
Data='1',
|
||||
PartitionKey='1')
|
||||
|
||||
response = conn.describe_stream(StreamName=stream_name)
|
||||
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
|
||||
response = conn.get_shard_iterator(StreamName=stream_name,
|
||||
ShardId=shard_id,
|
||||
ShardIteratorType='AT_TIMESTAMP',
|
||||
Timestamp=timestamp)
|
||||
shard_iterator = response['ShardIterator']
|
||||
|
||||
response = conn.get_records(ShardIterator=shard_iterator)
|
||||
response['Records'].should.have.length_of(1)
|
||||
response['Records'][0]['PartitionKey'].should.equal('1')
|
||||
response['Records'][0]['ApproximateArrivalTimestamp'].should.be.\
|
||||
greater_than(timestamp)
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
def test_get_records_millis_behind_latest():
|
||||
conn = boto3.client('kinesis', region_name="us-west-2")
|
||||
stream_name = "my_stream"
|
||||
conn.create_stream(StreamName=stream_name, ShardCount=1)
|
||||
|
||||
conn.put_record(StreamName=stream_name,
|
||||
Data='0',
|
||||
PartitionKey='0')
|
||||
time.sleep(1.0)
|
||||
conn.put_record(StreamName=stream_name,
|
||||
Data='1',
|
||||
PartitionKey='1')
|
||||
|
||||
response = conn.describe_stream(StreamName=stream_name)
|
||||
shard_id = response['StreamDescription']['Shards'][0]['ShardId']
|
||||
response = conn.get_shard_iterator(StreamName=stream_name,
|
||||
ShardId=shard_id,
|
||||
ShardIteratorType='TRIM_HORIZON')
|
||||
shard_iterator = response['ShardIterator']
|
||||
|
||||
response = conn.get_records(ShardIterator=shard_iterator, Limit=1)
|
||||
response['Records'].should.have.length_of(1)
|
||||
response['MillisBehindLatest'].should.be.greater_than(0)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
@ -363,6 +426,7 @@ def test_get_records_at_very_new_timestamp():
|
||||
response = conn.get_records(ShardIterator=shard_iterator)
|
||||
|
||||
response['Records'].should.have.length_of(0)
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis
|
||||
@ -385,6 +449,7 @@ def test_get_records_from_empty_stream_at_timestamp():
|
||||
response = conn.get_records(ShardIterator=shard_iterator)
|
||||
|
||||
response['Records'].should.have.length_of(0)
|
||||
response['MillisBehindLatest'].should.equal(0)
|
||||
|
||||
|
||||
@mock_kinesis_deprecated
|
||||
|
Loading…
Reference in New Issue
Block a user