Merge pull request #2755 from bblommers/dynamodbstreams_update_item

DynamoDB - Send item to DDB Stream on update, not just on create
This commit is contained in:
Steve Pulec 2020-02-18 18:15:25 -06:00 committed by GitHub
commit d297fc08f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 1 deletions

View File

@ -1406,6 +1406,7 @@ class DynamoDBBackend(BaseBackend):
range_value = None
item = table.get_item(hash_value, range_value)
orig_item = copy.deepcopy(item)
if not expected:
expected = {}
@ -1439,6 +1440,8 @@ class DynamoDBBackend(BaseBackend):
)
else:
item.update_with_attribute_updates(attribute_updates)
if table.stream_shard is not None:
table.stream_shard.add(orig_item, item)
return item
def delete_item(

View File

@ -1161,7 +1161,7 @@ def test_invoke_function_from_sqs():
@mock_logs
@mock_lambda
@mock_dynamodb2
def test_invoke_function_from_dynamodb():
def test_invoke_function_from_dynamodb_put():
logs_conn = boto3.client("logs", region_name="us-east-1")
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
table_name = "table_with_stream"
@ -1218,6 +1218,72 @@ def test_invoke_function_from_dynamodb():
assert False, "Test Failed"
@mock_logs
@mock_lambda
@mock_dynamodb2
def test_invoke_function_from_dynamodb_update():
logs_conn = boto3.client("logs", region_name="us-east-1")
dynamodb = boto3.client("dynamodb", region_name="us-east-1")
table_name = "table_with_stream"
table = dynamodb.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
StreamSpecification={
"StreamEnabled": True,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
)
dynamodb.put_item(TableName=table_name, Item={"id": {"S": "item 1"}})
conn = boto3.client("lambda", region_name="us-east-1")
func = conn.create_function(
FunctionName="testFunction",
Runtime="python2.7",
Role=get_role_name(),
Handler="lambda_function.lambda_handler",
Code={"ZipFile": get_test_zip_file3()},
Description="test lambda function executed after a DynamoDB table is updated",
Timeout=3,
MemorySize=128,
Publish=True,
)
response = conn.create_event_source_mapping(
EventSourceArn=table["TableDescription"]["LatestStreamArn"],
FunctionName=func["FunctionArn"],
)
assert response["EventSourceArn"] == table["TableDescription"]["LatestStreamArn"]
assert response["State"] == "Enabled"
dynamodb.update_item(
TableName=table_name,
Key={"id": {"S": "item 1"}},
UpdateExpression="set #attr = :val",
ExpressionAttributeNames={"#attr": "new_attr"},
ExpressionAttributeValues={":val": {"S": "new_val"}},
)
start = time.time()
while (time.time() - start) < 30:
result = logs_conn.describe_log_streams(logGroupName="/aws/lambda/testFunction")
log_streams = result.get("logStreams")
if not log_streams:
time.sleep(1)
continue
assert len(log_streams) == 1
result = logs_conn.get_log_events(
logGroupName="/aws/lambda/testFunction",
logStreamName=log_streams[0]["logStreamName"],
)
for event in result.get("events"):
if event["message"] == "get_test_zip_file3 success":
return
time.sleep(1)
assert False, "Test Failed"
@mock_logs
@mock_lambda
@mock_sqs