import os from functools import wraps from unittest import SkipTest import boto3 import botocore from moto import mock_aws def sns_aws_verified(func): """ Function that is verified to work against AWS. Can be run against AWS at any time by setting: MOTO_TEST_ALLOW_AWS_REQUEST=true If this environment variable is not set, the function runs in a `mock_aws` context. """ @wraps(func) def pagination_wrapper(): allow_aws_request = ( os.environ.get("MOTO_TEST_ALLOW_AWS_REQUEST", "false").lower() == "true" ) if allow_aws_request: ssm = boto3.client("ssm", "us-east-1") try: param = ssm.get_parameter( Name="/moto/tests/ses/firebase_api_key", WithDecryption=True ) api_key = param["Parameter"]["Value"] resp = func(api_key) except botocore.exceptions.ClientError: # SNS tests try to create a PlatformApplication that connects to GCM # (Google Cloud Messaging, also known as Firebase Messaging) # That requires an account and an API key # If the API key has not been configured in SSM, we'll just skip the test # # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns/client/create_platform_application.html # AWS calls it 'API key', but Firebase calls it Server Key # # https://stackoverflow.com/a/75896532/13245310 raise SkipTest("Can't execute SNS tests without Firebase API key") else: with mock_aws(): resp = func("mock_api_key") return resp return pagination_wrapper