diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index d9ea3b897..d9a47ea87 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -19,19 +19,20 @@ from .utils import compose_shard_iterator, compose_new_shard_iterator, decompose class Record(BaseModel): - def __init__(self, partition_key, data, sequence_number, explicit_hash_key): self.partition_key = partition_key self.data = data self.sequence_number = sequence_number self.explicit_hash_key = explicit_hash_key - self.create_at = unix_time() + self.created_at_datetime = datetime.datetime.utcnow() + self.created_at = unix_time(self.created_at_datetime) def to_json(self): return { "Data": self.data, "PartitionKey": self.partition_key, "SequenceNumber": str(self.sequence_number), + "ApproximateArrivalTimestamp": self.created_at_datetime.isoformat() } @@ -50,16 +51,21 @@ class Shard(BaseModel): def get_records(self, last_sequence_id, limit): last_sequence_id = int(last_sequence_id) results = [] + secs_behind_latest = 0 for sequence_number, record in self.records.items(): if sequence_number > last_sequence_id: results.append(record) last_sequence_id = sequence_number + very_last_record = self.records[next(reversed(self.records))] + secs_behind_latest = very_last_record.created_at - record.created_at + if len(results) == limit: break - return results, last_sequence_id + millis_behind_latest = int(secs_behind_latest * 1000) + return results, last_sequence_id, millis_behind_latest def put_record(self, partition_key, data, explicit_hash_key): # Note: this function is not safe for concurrency @@ -83,12 +89,12 @@ class Shard(BaseModel): return 0 def get_sequence_number_at(self, at_timestamp): - if not self.records or at_timestamp < list(self.records.values())[0].create_at: + if not self.records or at_timestamp < list(self.records.values())[0].created_at: return 0 else: # find the last item in the list that was created before # at_timestamp - r = next((r for r in reversed(self.records.values()) if r.create_at < at_timestamp), None) + r = next((r for r in reversed(self.records.values()) if r.created_at < at_timestamp), None) return r.sequence_number def to_json(self): @@ -226,7 +232,7 @@ class DeliveryStream(BaseModel): self.records = [] self.status = 'ACTIVE' - self.create_at = datetime.datetime.utcnow() + self.created_at = datetime.datetime.utcnow() self.last_updated = datetime.datetime.utcnow() @property @@ -267,7 +273,7 @@ class DeliveryStream(BaseModel): def to_dict(self): return { "DeliveryStreamDescription": { - "CreateTimestamp": time.mktime(self.create_at.timetuple()), + "CreateTimestamp": time.mktime(self.created_at.timetuple()), "DeliveryStreamARN": self.arn, "DeliveryStreamName": self.name, "DeliveryStreamStatus": self.status, @@ -329,12 +335,12 @@ class KinesisBackend(BaseBackend): stream = self.describe_stream(stream_name) shard = stream.get_shard(shard_id) - records, last_sequence_id = shard.get_records(last_sequence_id, limit) + records, last_sequence_id, millis_behind_latest = shard.get_records(last_sequence_id, limit) next_shard_iterator = compose_shard_iterator( stream_name, shard, last_sequence_id) - return next_shard_iterator, records + return next_shard_iterator, records, millis_behind_latest def put_record(self, stream_name, partition_key, explicit_hash_key, sequence_number_for_ordering, data): stream = self.describe_stream(stream_name) diff --git a/moto/kinesis/responses.py b/moto/kinesis/responses.py index b9b4883ef..72b2af4ce 100644 --- a/moto/kinesis/responses.py +++ b/moto/kinesis/responses.py @@ -80,12 +80,13 @@ class KinesisResponse(BaseResponse): shard_iterator = self.parameters.get("ShardIterator") limit = self.parameters.get("Limit") - next_shard_iterator, records = self.kinesis_backend.get_records( + next_shard_iterator, records, millis_behind_latest = self.kinesis_backend.get_records( shard_iterator, limit) return json.dumps({ "NextShardIterator": next_shard_iterator, - "Records": [record.to_json() for record in records] + "Records": [record.to_json() for record in records], + 'MillisBehindLatest': millis_behind_latest }) def put_record(self): diff --git a/tests/test_kinesis/test_kinesis.py b/tests/test_kinesis/test_kinesis.py index e3d350023..c70236978 100644 --- a/tests/test_kinesis/test_kinesis.py +++ b/tests/test_kinesis/test_kinesis.py @@ -89,6 +89,7 @@ def test_basic_shard_iterator(): response = conn.get_records(shard_iterator) shard_iterator = response['NextShardIterator'] response['Records'].should.equal([]) + response['MillisBehindLatest'].should.equal(0) @mock_kinesis_deprecated @@ -225,6 +226,7 @@ def test_get_records_after_sequence_number(): response = conn.get_records(shard_iterator) # And the first result returned should be the third item response['Records'][0]['Data'].should.equal('3') + response['MillisBehindLatest'].should.equal(0) @mock_kinesis_deprecated @@ -262,6 +264,7 @@ def test_get_records_latest(): response['Records'].should.have.length_of(1) response['Records'][0]['PartitionKey'].should.equal('last_record') response['Records'][0]['Data'].should.equal('last_record') + response['MillisBehindLatest'].should.equal(0) @mock_kinesis @@ -305,6 +308,7 @@ def test_get_records_at_timestamp(): response['Records'].should.have.length_of(len(keys)) partition_keys = [r['PartitionKey'] for r in response['Records']] partition_keys.should.equal(keys) + response['MillisBehindLatest'].should.equal(0) @mock_kinesis @@ -330,10 +334,69 @@ def test_get_records_at_very_old_timestamp(): shard_iterator = response['ShardIterator'] response = conn.get_records(ShardIterator=shard_iterator) - response['Records'].should.have.length_of(len(keys)) partition_keys = [r['PartitionKey'] for r in response['Records']] partition_keys.should.equal(keys) + response['MillisBehindLatest'].should.equal(0) + + +@mock_kinesis +def test_get_records_timestamp_filtering(): + conn = boto3.client('kinesis', region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + conn.put_record(StreamName=stream_name, + Data='0', + PartitionKey='0') + + time.sleep(1.0) + timestamp = datetime.datetime.utcnow() + + conn.put_record(StreamName=stream_name, + Data='1', + PartitionKey='1') + + response = conn.describe_stream(StreamName=stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType='AT_TIMESTAMP', + Timestamp=timestamp) + shard_iterator = response['ShardIterator'] + + response = conn.get_records(ShardIterator=shard_iterator) + response['Records'].should.have.length_of(1) + response['Records'][0]['PartitionKey'].should.equal('1') + response['Records'][0]['ApproximateArrivalTimestamp'].should.be.\ + greater_than(timestamp) + response['MillisBehindLatest'].should.equal(0) + + +@mock_kinesis +def test_get_records_millis_behind_latest(): + conn = boto3.client('kinesis', region_name="us-west-2") + stream_name = "my_stream" + conn.create_stream(StreamName=stream_name, ShardCount=1) + + conn.put_record(StreamName=stream_name, + Data='0', + PartitionKey='0') + time.sleep(1.0) + conn.put_record(StreamName=stream_name, + Data='1', + PartitionKey='1') + + response = conn.describe_stream(StreamName=stream_name) + shard_id = response['StreamDescription']['Shards'][0]['ShardId'] + response = conn.get_shard_iterator(StreamName=stream_name, + ShardId=shard_id, + ShardIteratorType='TRIM_HORIZON') + shard_iterator = response['ShardIterator'] + + response = conn.get_records(ShardIterator=shard_iterator, Limit=1) + response['Records'].should.have.length_of(1) + response['MillisBehindLatest'].should.be.greater_than(0) @mock_kinesis @@ -363,6 +426,7 @@ def test_get_records_at_very_new_timestamp(): response = conn.get_records(ShardIterator=shard_iterator) response['Records'].should.have.length_of(0) + response['MillisBehindLatest'].should.equal(0) @mock_kinesis @@ -385,6 +449,7 @@ def test_get_records_from_empty_stream_at_timestamp(): response = conn.get_records(ShardIterator=shard_iterator) response['Records'].should.have.length_of(0) + response['MillisBehindLatest'].should.equal(0) @mock_kinesis_deprecated