moto/tests/test_stepfunctions/parser/test_stepfunctions_sns_integration.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

39 lines
1.2 KiB
Python
Raw Normal View History

2024-03-16 14:03:41 +00:00
import json
from unittest import SkipTest
import boto3
from moto import mock_aws, settings
from moto.sns.models import sns_backends
from tests import DEFAULT_ACCOUNT_ID
from . import verify_execution_result
@mock_aws(config={"stepfunctions": {"execute_state_machine": True}})
def test_state_machine_calling_sns_publish():
if not settings.TEST_DECORATOR_MODE:
raise SkipTest("No point in testing this in ServerMode")
sns = boto3.client("sns", "us-east-1")
topic_arn = sns.create_topic(Name="mytopic")["TopicArn"]
exec_input = {"TopicArn": topic_arn, "Message": "my msg"}
expected_status = "SUCCEEDED"
tmpl_name = "services/sns_publish"
def _verify_result(client, execution, execution_arn):
assert "stopDate" in execution
assert json.loads(execution["input"]) == exec_input
assert "MessageId" in json.loads(execution["output"])
backend = sns_backends[DEFAULT_ACCOUNT_ID]["us-east-1"]
topic = list(backend.topics.values())[0]
notification = topic.sent_notifications[0]
_, msg, _, _, _ = notification
assert msg == "my msg"
verify_execution_result(
_verify_result, expected_status, tmpl_name, exec_input=json.dumps(exec_input)
)