diff --git a/moto/dynamodb2/models.py b/moto/dynamodb2/models.py index 82c3559ea..88f750775 100644 --- a/moto/dynamodb2/models.py +++ b/moto/dynamodb2/models.py @@ -1406,6 +1406,7 @@ class DynamoDBBackend(BaseBackend): range_value = None item = table.get_item(hash_value, range_value) + orig_item = copy.deepcopy(item) if not expected: expected = {} @@ -1439,6 +1440,8 @@ class DynamoDBBackend(BaseBackend): ) else: item.update_with_attribute_updates(attribute_updates) + if table.stream_shard is not None: + table.stream_shard.add(orig_item, item) return item def delete_item( diff --git a/tests/test_awslambda/test_lambda.py b/tests/test_awslambda/test_lambda.py index 4f0bc5063..2bd8f4bb3 100644 --- a/tests/test_awslambda/test_lambda.py +++ b/tests/test_awslambda/test_lambda.py @@ -1161,7 +1161,7 @@ def test_invoke_function_from_sqs(): @mock_logs @mock_lambda @mock_dynamodb2 -def test_invoke_function_from_dynamodb(): +def test_invoke_function_from_dynamodb_put(): logs_conn = boto3.client("logs", region_name="us-east-1") dynamodb = boto3.client("dynamodb", region_name="us-east-1") table_name = "table_with_stream" @@ -1218,6 +1218,72 @@ def test_invoke_function_from_dynamodb(): assert False, "Test Failed" +@mock_logs +@mock_lambda +@mock_dynamodb2 +def test_invoke_function_from_dynamodb_update(): + logs_conn = boto3.client("logs", region_name="us-east-1") + dynamodb = boto3.client("dynamodb", region_name="us-east-1") + table_name = "table_with_stream" + table = dynamodb.create_table( + TableName=table_name, + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}], + StreamSpecification={ + "StreamEnabled": True, + "StreamViewType": "NEW_AND_OLD_IMAGES", + }, + ) + dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}}) + + conn = boto3.client("lambda", region_name="us-east-1") + func = conn.create_function( + FunctionName="testFunction", + Runtime="python2.7", + Role=get_role_name(), + Handler="lambda_function.lambda_handler", + Code={"ZipFile": get_test_zip_file3()}, + Description="test lambda function executed after a DynamoDB table is updated", + Timeout=3, + MemorySize=128, + Publish=True, + ) + + response = conn.create_event_source_mapping( + EventSourceArn=table["TableDescription"]["LatestStreamArn"], + FunctionName=func["FunctionArn"], + ) + + assert response["EventSourceArn"] == table["TableDescription"]["LatestStreamArn"] + assert response["State"] == "Enabled" + dynamodb.update_item( + TableName=table_name, + Key={"id": {"S": "item 1"}}, + UpdateExpression="set #attr = :val", + ExpressionAttributeNames={"#attr": "new_attr"}, + ExpressionAttributeValues={":val": {"S": "new_val"}}, + ) + start = time.time() + while (time.time() - start) < 30: + result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction") + log_streams = result.get("logStreams") + if not log_streams: + time.sleep(1) + continue + + assert len(log_streams) == 1 + result = logs_conn.get_log_events( + logGroupName="/aws/lambda/testFunction", + logStreamName=log_streams[0]["logStreamName"], + ) + for event in result.get("events"): + if event["message"] == "get_test_zip_file3 success": + return + time.sleep(1) + + assert False, "Test Failed" + + @mock_logs @mock_lambda @mock_sqs