131 lines
3.9 KiB
Python
131 lines
3.9 KiB
Python
import json
|
|
import os
|
|
from functools import wraps
|
|
from uuid import uuid4
|
|
|
|
import boto3
|
|
import pytest
|
|
import requests
|
|
|
|
from moto import mock_aws
|
|
|
|
from . import base_url, verify_execution_result
|
|
|
|
|
|
def aws_verified(func):
|
|
"""
|
|
Function that is verified to work against AWS.
|
|
Can be run against AWS at any time by setting:
|
|
MOTO_TEST_ALLOW_AWS_REQUEST=true
|
|
|
|
If this environment variable is not set, the function runs in a `mock_aws` context.
|
|
|
|
This decorator will:
|
|
- Create an IAM-role that can be used by AWSLambda functions table
|
|
- Run the test
|
|
- Delete the role
|
|
"""
|
|
|
|
@wraps(func)
|
|
def pagination_wrapper():
|
|
table_name = "table_" + str(uuid4())[0:6]
|
|
|
|
allow_aws_request = (
|
|
os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true"
|
|
)
|
|
|
|
if allow_aws_request:
|
|
return create_table_and_test(table_name, sleep_time=10)
|
|
else:
|
|
with mock_aws():
|
|
requests.post(
|
|
f"http://{base_url}/moto-api/config",
|
|
json={"stepfunctions": {"execute_state_machine": True}},
|
|
)
|
|
resp = create_table_and_test(table_name, sleep_time=0)
|
|
requests.post(
|
|
f"http://{base_url}/moto-api/config",
|
|
json={"stepfunctions": {"execute_state_machine": False}},
|
|
)
|
|
return resp
|
|
|
|
def create_table_and_test(table_name, sleep_time):
|
|
client = boto3.client("dynamodb", region_name="us-east-1")
|
|
|
|
client.create_table(
|
|
TableName=table_name,
|
|
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
|
|
AttributeDefinitions=[{"AttributeName": "id", "AttributeType": "S"}],
|
|
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 5},
|
|
Tags=[{"Key": "environment", "Value": "moto_tests"}],
|
|
)
|
|
waiter = client.get_waiter("table_exists")
|
|
waiter.wait(TableName=table_name)
|
|
try:
|
|
resp = func(table_name, sleep_time)
|
|
finally:
|
|
### CLEANUP ###
|
|
client.delete_table(TableName=table_name)
|
|
|
|
return resp
|
|
|
|
return pagination_wrapper
|
|
|
|
|
|
@aws_verified
|
|
@pytest.mark.aws_verified
|
|
def test_state_machine_calling_dynamodb_put(table_name=None, sleep_time=0):
|
|
dynamodb = boto3.client("dynamodb", "us-east-1")
|
|
exec_input = {
|
|
"TableName": table_name,
|
|
"Item": {"data": {"S": "HelloWorld"}, "id": {"S": "id1"}},
|
|
}
|
|
|
|
expected_status = "SUCCEEDED"
|
|
tmpl_name = "services/dynamodb_put_item"
|
|
|
|
def _verify_result(client, execution, execution_arn):
|
|
assert "stopDate" in execution
|
|
assert json.loads(execution["input"]) == exec_input
|
|
|
|
items = dynamodb.scan(TableName=table_name)["Items"]
|
|
assert items == [exec_input["Item"]]
|
|
|
|
verify_execution_result(
|
|
_verify_result,
|
|
expected_status,
|
|
tmpl_name,
|
|
exec_input=json.dumps(exec_input),
|
|
sleep_time=sleep_time,
|
|
)
|
|
|
|
|
|
@aws_verified
|
|
@pytest.mark.aws_verified
|
|
def test_state_machine_calling_dynamodb_put_and_delete(table_name=None, sleep_time=0):
|
|
dynamodb = boto3.client("dynamodb", "us-east-1")
|
|
exec_input = {
|
|
"TableName": table_name,
|
|
"Item1": {"data": {"S": "HelloWorld"}, "id": {"S": "id1"}},
|
|
"Item2": {"data": {"S": "HelloWorld"}, "id": {"S": "id2"}},
|
|
"Key": {"id": {"S": "id1"}},
|
|
}
|
|
|
|
expected_status = "SUCCEEDED"
|
|
tmpl_name = "services/dynamodb_put_delete_item"
|
|
|
|
def _verify_result(client, execution, execution_arn):
|
|
assert "stopDate" in execution
|
|
assert json.loads(execution["input"]) == exec_input
|
|
|
|
items = dynamodb.scan(TableName=table_name)["Items"]
|
|
assert items == [exec_input["Item2"]]
|
|
|
|
verify_execution_result(
|
|
_verify_result,
|
|
expected_status,
|
|
tmpl_name,
|
|
exec_input=json.dumps(exec_input),
|
|
sleep_time=sleep_time,
|
|
)
|