Merge pull request #2763 from bblommers/bugfix/multiple_items_in_ddbstream
DynamoDB Streams: Bugfix when sending multiple items
This commit is contained in:
commit
05236684f4
@ -981,7 +981,7 @@ class LambdaBackend(BaseBackend):
|
||||
]
|
||||
}
|
||||
func = self._lambdas.get_arn(function_arn)
|
||||
func.invoke(json.dumps(event), {}, {})
|
||||
return func.invoke(json.dumps(event), {}, {})
|
||||
|
||||
def list_tags(self, resource):
|
||||
return self.get_function_by_arn(resource).tags
|
||||
|
@ -70,6 +70,7 @@ def lambda_handler(event, context):
|
||||
def get_test_zip_file3():
|
||||
pfunc = """
|
||||
def lambda_handler(event, context):
|
||||
print("Nr_of_records("+str(len(event['Records']))+")")
|
||||
print("get_test_zip_file3 success")
|
||||
return event
|
||||
"""
|
||||
@ -1111,7 +1112,6 @@ def test_create_event_source_mapping():
|
||||
@mock_lambda
|
||||
@mock_sqs
|
||||
def test_invoke_function_from_sqs():
|
||||
logs_conn = boto3.client("logs", region_name="us-east-1")
|
||||
sqs = boto3.resource("sqs", region_name="us-east-1")
|
||||
queue = sqs.create_queue(QueueName="test-sqs-queue1")
|
||||
|
||||
@ -1137,32 +1137,22 @@ def test_invoke_function_from_sqs():
|
||||
|
||||
sqs_client = boto3.client("sqs", region_name="us-east-1")
|
||||
sqs_client.send_message(QueueUrl=queue.url, MessageBody="test")
|
||||
start = time.time()
|
||||
while (time.time() - start) < 30:
|
||||
result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
|
||||
log_streams = result.get("logStreams")
|
||||
if not log_streams:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
assert len(log_streams) == 1
|
||||
result = logs_conn.get_log_events(
|
||||
logGroupName="/aws/lambda/testFunction",
|
||||
logStreamName=log_streams[0]["logStreamName"],
|
||||
)
|
||||
for event in result.get("events"):
|
||||
if event["message"] == "get_test_zip_file3 success":
|
||||
return
|
||||
time.sleep(1)
|
||||
expected_msg = "get_test_zip_file3 success"
|
||||
log_group = "/aws/lambda/testFunction"
|
||||
msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group)
|
||||
|
||||
assert False, "Test Failed"
|
||||
assert msg_showed_up, (
|
||||
expected_msg
|
||||
+ " was not found after sending an SQS message. All logs: "
|
||||
+ all_logs
|
||||
)
|
||||
|
||||
|
||||
@mock_logs
|
||||
@mock_lambda
|
||||
@mock_dynamodb2
|
||||
def test_invoke_function_from_dynamodb_put():
|
||||
logs_conn = boto3.client("logs", region_name="us-east-1")
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
table_name = "table_with_stream"
|
||||
table = dynamodb.create_table(
|
||||
@ -1197,32 +1187,20 @@ def test_invoke_function_from_dynamodb_put():
|
||||
assert response["State"] == "Enabled"
|
||||
|
||||
dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}})
|
||||
start = time.time()
|
||||
while (time.time() - start) < 30:
|
||||
result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
|
||||
log_streams = result.get("logStreams")
|
||||
if not log_streams:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
assert len(log_streams) == 1
|
||||
result = logs_conn.get_log_events(
|
||||
logGroupName="/aws/lambda/testFunction",
|
||||
logStreamName=log_streams[0]["logStreamName"],
|
||||
)
|
||||
for event in result.get("events"):
|
||||
if event["message"] == "get_test_zip_file3 success":
|
||||
return
|
||||
time.sleep(1)
|
||||
expected_msg = "get_test_zip_file3 success"
|
||||
log_group = "/aws/lambda/testFunction"
|
||||
msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group)
|
||||
|
||||
assert False, "Test Failed"
|
||||
assert msg_showed_up, (
|
||||
expected_msg + " was not found after a DDB insert. All logs: " + all_logs
|
||||
)
|
||||
|
||||
|
||||
@mock_logs
|
||||
@mock_lambda
|
||||
@mock_dynamodb2
|
||||
def test_invoke_function_from_dynamodb_update():
|
||||
logs_conn = boto3.client("logs", region_name="us-east-1")
|
||||
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
|
||||
table_name = "table_with_stream"
|
||||
table = dynamodb.create_table(
|
||||
@ -1234,7 +1212,6 @@ def test_invoke_function_from_dynamodb_update():
|
||||
"StreamViewType": "NEW_AND_OLD_IMAGES",
|
||||
},
|
||||
)
|
||||
dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}})
|
||||
|
||||
conn = boto3.client("lambda", region_name="us-east-1")
|
||||
func = conn.create_function(
|
||||
@ -1249,13 +1226,17 @@ def test_invoke_function_from_dynamodb_update():
|
||||
Publish=True,
|
||||
)
|
||||
|
||||
response = conn.create_event_source_mapping(
|
||||
conn.create_event_source_mapping(
|
||||
EventSourceArn=table["TableDescription"]["LatestStreamArn"],
|
||||
FunctionName=func["FunctionArn"],
|
||||
)
|
||||
|
||||
assert response["EventSourceArn"] == table["TableDescription"]["LatestStreamArn"]
|
||||
assert response["State"] == "Enabled"
|
||||
dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}})
|
||||
log_group = "/aws/lambda/testFunction"
|
||||
expected_msg = "get_test_zip_file3 success"
|
||||
msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group)
|
||||
assert "Nr_of_records(1)" in all_logs, "Only one item should be inserted"
|
||||
|
||||
dynamodb.update_item(
|
||||
TableName=table_name,
|
||||
Key={"id": {"S": "item 1"}},
|
||||
@ -1263,25 +1244,39 @@ def test_invoke_function_from_dynamodb_update():
|
||||
ExpressionAttributeNames={"#attr": "new_attr"},
|
||||
ExpressionAttributeValues={":val": {"S": "new_val"}},
|
||||
)
|
||||
msg_showed_up, all_logs = wait_for_log_msg(expected_msg, log_group)
|
||||
|
||||
assert msg_showed_up, (
|
||||
expected_msg + " was not found after updating DDB. All logs: " + str(all_logs)
|
||||
)
|
||||
assert "Nr_of_records(1)" in all_logs, "Only one item should be updated"
|
||||
assert (
|
||||
"Nr_of_records(2)" not in all_logs
|
||||
), "The inserted item should not show up again"
|
||||
|
||||
|
||||
def wait_for_log_msg(expected_msg, log_group):
|
||||
logs_conn = boto3.client("logs", region_name="us-east-1")
|
||||
received_messages = []
|
||||
start = time.time()
|
||||
while (time.time() - start) < 30:
|
||||
result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
|
||||
while (time.time() - start) < 10:
|
||||
result = logs_conn.describe_log_streams(logGroupName=log_group)
|
||||
log_streams = result.get("logStreams")
|
||||
if not log_streams:
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
assert len(log_streams) == 1
|
||||
result = logs_conn.get_log_events(
|
||||
logGroupName="/aws/lambda/testFunction",
|
||||
logStreamName=log_streams[0]["logStreamName"],
|
||||
)
|
||||
for event in result.get("events"):
|
||||
if event["message"] == "get_test_zip_file3 success":
|
||||
return
|
||||
for log_stream in log_streams:
|
||||
result = logs_conn.get_log_events(
|
||||
logGroupName=log_group, logStreamName=log_stream["logStreamName"],
|
||||
)
|
||||
received_messages.extend(
|
||||
[event["message"] for event in result.get("events")]
|
||||
)
|
||||
if expected_msg in received_messages:
|
||||
return True, received_messages
|
||||
time.sleep(1)
|
||||
|
||||
assert False, "Test Failed"
|
||||
return False, received_messages
|
||||
|
||||
|
||||
@mock_logs
|
||||
|
Loading…
Reference in New Issue
Block a user