moto/tests/test_stepfunctions/parser/test_stepfunctions_dynamodb_integration.py
2024-03-16 13:03:41 -01:00

131 lines
3.9 KiB
Python

import json
import os
from functools import wraps
from uuid import uuid4
import boto3
import pytest
import requests
from moto import mock_aws
from . import base_url, verify_execution_result
def aws_verified(func):
"""
Function that is verified to work against AWS.
Can be run against AWS at any time by setting:
MOTO_TEST_ALLOW_AWS_REQUEST=true
If this environment variable is not set, the function runs in a `mock_aws` context.
This decorator will:
- Create an IAM-role that can be used by AWSLambda functions table
- Run the test
- Delete the role
"""
@wraps(func)
def pagination_wrapper():
table_name = "table_" + str(uuid4())[0:6]
allow_aws_request = (
os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
)
if allow_aws_request:
return create_table_and_test(table_name, sleep_time=10)
else:
with mock_aws():
requests.post(
f"http://{base_url}/moto-api/config",
json={"stepfunctions": {"execute_state_machine": True}},
)
resp = create_table_and_test(table_name, sleep_time=0)
requests.post(
f"http://{base_url}/moto-api/config",
json={"stepfunctions": {"execute_state_machine": False}},
)
return resp
def create_table_and_test(table_name, sleep_time):
client = boto3.client("dynamodb", region_name="us-east-1")
client.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5},
Tags=[{"Key": "environment", "Value": "moto_tests"}],
)
waiter = client.get_waiter("table_exists")
waiter.wait(TableName=table_name)
try:
resp = func(table_name, sleep_time)
finally:
### CLEANUP ###
client.delete_table(TableName=table_name)
return resp
return pagination_wrapper
@aws_verified
@pytest.mark.aws_verified
def test_state_machine_calling_dynamodb_put(table_name=None, sleep_time=0):
dynamodb = boto3.client("dynamodb", "us-east-1")
exec_input = {
"TableName": table_name,
"Item": {"data": {"S": "HelloWorld"}, "id": {"S": "id1"}},
}
expected_status = "SUCCEEDED"
tmpl_name = "services/dynamodb_put_item"
def _verify_result(client, execution, execution_arn):
assert "stopDate" in execution
assert json.loads(execution["input"]) == exec_input
items = dynamodb.scan(TableName=table_name)["Items"]
assert items == [exec_input["Item"]]
verify_execution_result(
_verify_result,
expected_status,
tmpl_name,
exec_input=json.dumps(exec_input),
sleep_time=sleep_time,
)
@aws_verified
@pytest.mark.aws_verified
def test_state_machine_calling_dynamodb_put_and_delete(table_name=None, sleep_time=0):
dynamodb = boto3.client("dynamodb", "us-east-1")
exec_input = {
"TableName": table_name,
"Item1": {"data": {"S": "HelloWorld"}, "id": {"S": "id1"}},
"Item2": {"data": {"S": "HelloWorld"}, "id": {"S": "id2"}},
"Key": {"id": {"S": "id1"}},
}
expected_status = "SUCCEEDED"
tmpl_name = "services/dynamodb_put_delete_item"
def _verify_result(client, execution, execution_arn):
assert "stopDate" in execution
assert json.loads(execution["input"]) == exec_input
items = dynamodb.scan(TableName=table_name)["Items"]
assert items == [exec_input["Item2"]]
verify_execution_result(
_verify_result,
expected_status,
tmpl_name,
exec_input=json.dumps(exec_input),
sleep_time=sleep_time,
)