AppSync - initial implementation (#4761)

This commit is contained in:
Bert Blommers 2022-01-14 20:12:26 -01:00 committed by GitHub
parent d118d592ca
commit f3313be991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1216 additions and 1 deletions

View File

@ -162,6 +162,61 @@
- [X] register_scalable_target - [X] register_scalable_target
</details> </details>
## appsync
<details>
<summary>30% implemented</summary>
- [ ] associate_api
- [ ] create_api_cache
- [X] create_api_key
- [ ] create_data_source
- [ ] create_domain_name
- [ ] create_function
- [X] create_graphql_api
- [ ] create_resolver
- [ ] create_type
- [ ] delete_api_cache
- [X] delete_api_key
- [ ] delete_data_source
- [ ] delete_domain_name
- [ ] delete_function
- [X] delete_graphql_api
- [ ] delete_resolver
- [ ] delete_type
- [ ] disassociate_api
- [ ] flush_api_cache
- [ ] get_api_association
- [ ] get_api_cache
- [ ] get_data_source
- [ ] get_domain_name
- [ ] get_function
- [X] get_graphql_api
- [ ] get_introspection_schema
- [ ] get_resolver
- [X] get_schema_creation_status
- [X] get_type
- [X] list_api_keys
- [ ] list_data_sources
- [ ] list_domain_names
- [ ] list_functions
- [X] list_graphql_apis
- [ ] list_resolvers
- [ ] list_resolvers_by_function
- [X] list_tags_for_resource
- [ ] list_types
- [X] start_schema_creation
- [X] tag_resource
- [X] untag_resource
- [ ] update_api_cache
- [X] update_api_key
- [ ] update_data_source
- [ ] update_domain_name
- [ ] update_function
- [X] update_graphql_api
- [ ] update_resolver
- [ ] update_type
</details>
## athena ## athena
<details> <details>
<summary>20% implemented</summary> <summary>20% implemented</summary>
@ -5132,7 +5187,6 @@
- appmesh - appmesh
- apprunner - apprunner
- appstream - appstream
- appsync
- auditmanager - auditmanager
- autoscaling-plans - autoscaling-plans
- backup - backup

View File

@ -0,0 +1,87 @@
.. _implementedservice_appsync:
.. |start-h3| raw:: html
<h3>
.. |end-h3| raw:: html
</h3>
=======
appsync
=======
.. autoclass:: moto.appsync.models.AppSyncBackend
|start-h3| Example usage |end-h3|
.. sourcecode:: python
@mock_appsync
def test_appsync_behaviour:
boto3.client("appsync")
...
|start-h3| Implemented features for this service |end-h3|
- [ ] associate_api
- [ ] create_api_cache
- [X] create_api_key
- [ ] create_data_source
- [ ] create_domain_name
- [ ] create_function
- [X] create_graphql_api
- [ ] create_resolver
- [ ] create_type
- [ ] delete_api_cache
- [X] delete_api_key
- [ ] delete_data_source
- [ ] delete_domain_name
- [ ] delete_function
- [X] delete_graphql_api
- [ ] delete_resolver
- [ ] delete_type
- [ ] disassociate_api
- [ ] flush_api_cache
- [ ] get_api_association
- [ ] get_api_cache
- [ ] get_data_source
- [ ] get_domain_name
- [ ] get_function
- [X] get_graphql_api
- [ ] get_introspection_schema
- [ ] get_resolver
- [X] get_schema_creation_status
- [X] get_type
- [X] list_api_keys
Pagination or the maxResults-parameter have not yet been implemented.
- [ ] list_data_sources
- [ ] list_domain_names
- [ ] list_functions
- [X] list_graphql_apis
Pagination or the maxResults-parameter have not yet been implemented.
- [ ] list_resolvers
- [ ] list_resolvers_by_function
- [X] list_tags_for_resource
- [ ] list_types
- [X] start_schema_creation
- [X] tag_resource
- [X] untag_resource
- [ ] update_api_cache
- [X] update_api_key
- [ ] update_data_source
- [ ] update_domain_name
- [ ] update_function
- [X] update_graphql_api
- [ ] update_resolver
- [ ] update_type

View File

@ -27,6 +27,7 @@ def lazy_load(
mock_acm = lazy_load(".acm", "mock_acm") mock_acm = lazy_load(".acm", "mock_acm")
mock_apigateway = lazy_load(".apigateway", "mock_apigateway") mock_apigateway = lazy_load(".apigateway", "mock_apigateway")
mock_apigateway_deprecated = lazy_load(".apigateway", "mock_apigateway_deprecated") mock_apigateway_deprecated = lazy_load(".apigateway", "mock_apigateway_deprecated")
mock_appsync = lazy_load(".appsync", "mock_appsync", boto3_name="appsync")
mock_athena = lazy_load(".athena", "mock_athena") mock_athena = lazy_load(".athena", "mock_athena")
mock_applicationautoscaling = lazy_load( mock_applicationautoscaling = lazy_load(
".applicationautoscaling", "mock_applicationautoscaling" ".applicationautoscaling", "mock_applicationautoscaling"

5
moto/appsync/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""appsync module initialization; sets value for base decorator."""
from .models import appsync_backends
from ..core.models import base_decorator
mock_appsync = base_decorator(appsync_backends)

View File

@ -0,0 +1,16 @@
import json
from moto.core.exceptions import JsonRESTError
class AppSyncExceptions(JsonRESTError):
pass
class GraphqlAPINotFound(AppSyncExceptions):
code = 404
def __init__(self, api_id):
super().__init__(
"NotFoundException", f"GraphQL API {api_id} not found.",
)
self.description = json.dumps({"message": self.message})

309
moto/appsync/models.py Normal file
View File

@ -0,0 +1,309 @@
import base64
from datetime import timedelta, datetime, timezone
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.utils import BackendDict, unix_time
from moto.utilities.tagging_service import TaggingService
from uuid import uuid4
from .exceptions import GraphqlAPINotFound
class GraphqlSchema(BaseModel):
def __init__(self, definition):
self.definition = definition
# [graphql.language.ast.ObjectTypeDefinitionNode, ..]
self.types = []
self.status = "PROCESSING"
self.parse_error = None
self._parse_graphql_definition()
def get_type(self, name):
for graphql_type in self.types:
if graphql_type.name.value == name:
return {
"name": name,
"description": graphql_type.description.value
if graphql_type.description
else None,
"arn": f"arn:aws:appsync:graphql_type/{name}",
"definition": "NotYetImplemented",
}
def get_status(self):
return self.status, self.parse_error
def _parse_graphql_definition(self):
try:
from graphql import parse
from graphql.language.ast import ObjectTypeDefinitionNode
from graphql.error.graphql_error import GraphQLError
res = parse(self.definition)
for definition in res.definitions:
if isinstance(definition, ObjectTypeDefinitionNode):
self.types.append(definition)
self.status = "SUCCESS"
except GraphQLError as e:
self.status = "FAILED"
self.parse_error = str(e)
class GraphqlAPI(BaseModel):
def __init__(
self,
region,
name,
authentication_type,
additional_authentication_providers,
log_config,
xray_enabled,
user_pool_config,
open_id_connect_config,
lambda_authorizer_config,
):
self.region = region
self.name = name
self.api_id = str(uuid4())
self.authentication_type = authentication_type
self.additional_authentication_providers = additional_authentication_providers
self.lambda_authorizer_config = lambda_authorizer_config
self.log_config = log_config
self.open_id_connect_config = open_id_connect_config
self.user_pool_config = user_pool_config
self.xray_enabled = xray_enabled
self.arn = f"arn:aws:appsync:{self.region}:{ACCOUNT_ID}:apis/{self.api_id}"
self.graphql_schema = None
self.api_keys = dict()
def update(
self,
name,
additional_authentication_providers,
authentication_type,
lambda_authorizer_config,
log_config,
open_id_connect_config,
user_pool_config,
xray_enabled,
):
if name:
self.name = name
if additional_authentication_providers:
self.additional_authentication_providers = (
additional_authentication_providers
)
if authentication_type:
self.authentication_type = authentication_type
if lambda_authorizer_config:
self.lambda_authorizer_config = lambda_authorizer_config
if log_config:
self.log_config = log_config
if open_id_connect_config:
self.open_id_connect_config = open_id_connect_config
if user_pool_config:
self.user_pool_config = user_pool_config
if xray_enabled is not None:
self.xray_enabled = xray_enabled
def create_api_key(self, description, expires):
api_key = GraphqlAPIKey(description, expires)
self.api_keys[api_key.key_id] = api_key
return api_key
def list_api_keys(self):
return self.api_keys.values()
def delete_api_key(self, api_key_id):
self.api_keys.pop(api_key_id)
def update_api_key(self, api_key_id, description, expires):
api_key = self.api_keys[api_key_id]
api_key.update(description, expires)
return api_key
def start_schema_creation(self, definition):
graphql_definition = base64.b64decode(definition).decode("utf-8")
self.graphql_schema = GraphqlSchema(graphql_definition)
def get_schema_status(self):
return self.graphql_schema.get_status()
def get_type(self, type_name, type_format):
graphql_type = self.graphql_schema.get_type(type_name)
graphql_type["format"] = type_format
return graphql_type
def to_json(self):
return {
"name": self.name,
"apiId": self.api_id,
"authenticationType": self.authentication_type,
"arn": self.arn,
"uris": {"GRAPHQL": "http://graphql.uri"},
"additionalAuthenticationProviders": self.additional_authentication_providers,
"lambdaAuthorizerConfig": self.lambda_authorizer_config,
"logConfig": self.log_config,
"openIDConnectConfig": self.open_id_connect_config,
"userPoolConfig": self.user_pool_config,
"xrayEnabled": self.xray_enabled,
}
class GraphqlAPIKey(BaseModel):
def __init__(self, description, expires):
self.key_id = str(uuid4())[0:6]
self.description = description
self.expires = expires
if not self.expires:
default_expiry = datetime.now(timezone.utc)
default_expiry = default_expiry.replace(
minute=0, second=0, microsecond=0, tzinfo=None
)
default_expiry = default_expiry + timedelta(days=7)
self.expires = unix_time(default_expiry)
def update(self, description, expires):
if description:
self.description = description
if expires:
self.expires = expires
def to_json(self):
return {
"id": self.key_id,
"description": self.description,
"expires": self.expires,
"deletes": self.expires,
}
class AppSyncBackend(BaseBackend):
"""Implementation of AppSync APIs."""
def __init__(self, region_name=None):
self.region_name = region_name
self.graphql_apis = dict()
self.tagger = TaggingService()
def reset(self):
"""Re-initialize all attributes for this instance."""
region_name = self.region_name
self.__dict__ = {}
self.__init__(region_name)
def create_graphql_api(
self,
name,
log_config,
authentication_type,
user_pool_config,
open_id_connect_config,
additional_authentication_providers,
xray_enabled,
lambda_authorizer_config,
tags,
):
graphql_api = GraphqlAPI(
region=self.region_name,
name=name,
authentication_type=authentication_type,
additional_authentication_providers=additional_authentication_providers,
log_config=log_config,
xray_enabled=xray_enabled,
user_pool_config=user_pool_config,
open_id_connect_config=open_id_connect_config,
lambda_authorizer_config=lambda_authorizer_config,
)
self.graphql_apis[graphql_api.api_id] = graphql_api
self.tagger.tag_resource(
graphql_api.arn, TaggingService.convert_dict_to_tags_input(tags)
)
return graphql_api
def update_graphql_api(
self,
api_id,
name,
log_config,
authentication_type,
user_pool_config,
open_id_connect_config,
additional_authentication_providers,
xray_enabled,
lambda_authorizer_config,
):
graphql_api = self.graphql_apis[api_id]
graphql_api.update(
name,
additional_authentication_providers,
authentication_type,
lambda_authorizer_config,
log_config,
open_id_connect_config,
user_pool_config,
xray_enabled,
)
return graphql_api
def get_graphql_api(self, api_id):
if api_id not in self.graphql_apis:
raise GraphqlAPINotFound(api_id)
return self.graphql_apis[api_id]
def delete_graphql_api(self, api_id):
self.graphql_apis.pop(api_id)
def list_graphql_apis(self):
"""
Pagination or the maxResults-parameter have not yet been implemented.
"""
return self.graphql_apis.values()
def create_api_key(self, api_id, description, expires):
return self.graphql_apis[api_id].create_api_key(description, expires)
def delete_api_key(self, api_id, api_key_id):
self.graphql_apis[api_id].delete_api_key(api_key_id)
def list_api_keys(self, api_id):
"""
Pagination or the maxResults-parameter have not yet been implemented.
"""
if api_id in self.graphql_apis:
return self.graphql_apis[api_id].list_api_keys()
else:
return []
def update_api_key(self, api_id, api_key_id, description, expires):
return self.graphql_apis[api_id].update_api_key(
api_key_id, description, expires
)
def start_schema_creation(self, api_id, definition):
self.graphql_apis[api_id].start_schema_creation(definition)
return "PROCESSING"
def get_schema_creation_status(self, api_id):
return self.graphql_apis[api_id].get_schema_status()
def tag_resource(self, resource_arn, tags):
self.tagger.tag_resource(
resource_arn, TaggingService.convert_dict_to_tags_input(tags)
)
def untag_resource(self, resource_arn, tag_keys):
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
def list_tags_for_resource(self, resource_arn):
return self.tagger.get_tag_dict_for_resource(resource_arn)
def get_type(self, api_id, type_name, type_format):
return self.graphql_apis[api_id].get_type(type_name, type_format)
appsync_backends = BackendDict(AppSyncBackend, "appsync")

250
moto/appsync/responses.py Normal file
View File

@ -0,0 +1,250 @@
"""Handles incoming appsync requests, invokes methods, returns responses."""
import json
from functools import wraps
from moto.core.responses import BaseResponse
from urllib.parse import unquote
from .exceptions import AppSyncExceptions
from .models import appsync_backends
def error_handler(f):
@wraps(f)
def _wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except AppSyncExceptions as e:
return e.code, e.get_headers(), e.get_body()
return _wrapper
class AppSyncResponse(BaseResponse):
"""Handler for AppSync requests and responses."""
@property
def appsync_backend(self):
"""Return backend instance specific for this region."""
return appsync_backends[self.region]
def graph_ql(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_graphql_api()
if request.method == "GET":
return self.list_graphql_apis()
@error_handler
def graph_ql_individual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.get_graphql_api()
if request.method == "DELETE":
return self.delete_graphql_api()
if request.method == "POST":
return self.update_graphql_api()
def api_key(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.create_api_key()
if request.method == "GET":
return self.list_api_keys()
def schemacreation(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.start_schema_creation()
if request.method == "GET":
return self.get_schema_creation_status()
def api_key_individual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "DELETE":
return self.delete_api_key()
if request.method == "POST":
return self.update_api_key()
def tags(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "POST":
return self.tag_resource()
if request.method == "DELETE":
return self.untag_resource()
if request.method == "GET":
return self.list_tags_for_resource()
def types(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.get_type()
def create_graphql_api(self):
params = json.loads(self.body)
name = params.get("name")
log_config = params.get("logConfig")
authentication_type = params.get("authenticationType")
user_pool_config = params.get("userPoolConfig")
open_id_connect_config = params.get("openIDConnectConfig")
tags = params.get("tags")
additional_authentication_providers = params.get(
"additionalAuthenticationProviders"
)
xray_enabled = params.get("xrayEnabled", False)
lambda_authorizer_config = params.get("lambdaAuthorizerConfig")
graphql_api = self.appsync_backend.create_graphql_api(
name=name,
log_config=log_config,
authentication_type=authentication_type,
user_pool_config=user_pool_config,
open_id_connect_config=open_id_connect_config,
additional_authentication_providers=additional_authentication_providers,
xray_enabled=xray_enabled,
lambda_authorizer_config=lambda_authorizer_config,
tags=tags,
)
response = graphql_api.to_json()
response["tags"] = self.appsync_backend.list_tags_for_resource(graphql_api.arn)
return 200, {}, json.dumps(dict(graphqlApi=response))
def get_graphql_api(self):
api_id = self.path.split("/")[-1]
graphql_api = self.appsync_backend.get_graphql_api(api_id=api_id)
response = graphql_api.to_json()
response["tags"] = self.appsync_backend.list_tags_for_resource(graphql_api.arn)
return 200, {}, json.dumps(dict(graphqlApi=response))
def delete_graphql_api(self):
api_id = self.path.split("/")[-1]
self.appsync_backend.delete_graphql_api(api_id=api_id)
return 200, {}, json.dumps(dict())
def update_graphql_api(self):
api_id = self.path.split("/")[-1]
params = json.loads(self.body)
name = params.get("name")
log_config = params.get("logConfig")
authentication_type = params.get("authenticationType")
user_pool_config = params.get("userPoolConfig")
print(user_pool_config)
open_id_connect_config = params.get("openIDConnectConfig")
additional_authentication_providers = params.get(
"additionalAuthenticationProviders"
)
xray_enabled = params.get("xrayEnabled", False)
lambda_authorizer_config = params.get("lambdaAuthorizerConfig")
api = self.appsync_backend.update_graphql_api(
api_id=api_id,
name=name,
log_config=log_config,
authentication_type=authentication_type,
user_pool_config=user_pool_config,
open_id_connect_config=open_id_connect_config,
additional_authentication_providers=additional_authentication_providers,
xray_enabled=xray_enabled,
lambda_authorizer_config=lambda_authorizer_config,
)
return 200, {}, json.dumps(dict(graphqlApi=api.to_json()))
def list_graphql_apis(self):
graphql_apis = self.appsync_backend.list_graphql_apis()
return (
200,
{},
json.dumps(dict(graphqlApis=[api.to_json() for api in graphql_apis])),
)
def create_api_key(self):
params = json.loads(self.body)
# /v1/apis/[api_id]/apikeys
api_id = self.path.split("/")[-2]
description = params.get("description")
expires = params.get("expires")
api_key = self.appsync_backend.create_api_key(
api_id=api_id, description=description, expires=expires,
)
print(api_key.to_json())
return 200, {}, json.dumps(dict(apiKey=api_key.to_json()))
def delete_api_key(self):
api_id = self.path.split("/")[-3]
api_key_id = self.path.split("/")[-1]
self.appsync_backend.delete_api_key(
api_id=api_id, api_key_id=api_key_id,
)
return 200, {}, json.dumps(dict())
def list_api_keys(self):
# /v1/apis/[api_id]/apikeys
api_id = self.path.split("/")[-2]
api_keys = self.appsync_backend.list_api_keys(api_id=api_id)
return 200, {}, json.dumps(dict(apiKeys=[key.to_json() for key in api_keys]))
def update_api_key(self):
api_id = self.path.split("/")[-3]
api_key_id = self.path.split("/")[-1]
params = json.loads(self.body)
description = params.get("description")
expires = params.get("expires")
api_key = self.appsync_backend.update_api_key(
api_id=api_id,
api_key_id=api_key_id,
description=description,
expires=expires,
)
return 200, {}, json.dumps(dict(apiKey=api_key.to_json()))
def start_schema_creation(self):
params = json.loads(self.body)
api_id = self.path.split("/")[-2]
definition = params.get("definition")
status = self.appsync_backend.start_schema_creation(
api_id=api_id, definition=definition,
)
return 200, {}, json.dumps({"status": status})
def get_schema_creation_status(self):
api_id = self.path.split("/")[-2]
status, details = self.appsync_backend.get_schema_creation_status(
api_id=api_id,
)
return 200, {}, json.dumps(dict(status=status, details=details))
def tag_resource(self):
resource_arn = self._extract_arn_from_path()
params = json.loads(self.body)
tags = params.get("tags")
self.appsync_backend.tag_resource(
resource_arn=resource_arn, tags=tags,
)
return 200, {}, json.dumps(dict())
def untag_resource(self):
resource_arn = self._extract_arn_from_path()
tag_keys = self.querystring.get("tagKeys", [])
self.appsync_backend.untag_resource(
resource_arn=resource_arn, tag_keys=tag_keys,
)
return 200, {}, json.dumps(dict())
def list_tags_for_resource(self):
resource_arn = self._extract_arn_from_path()
tags = self.appsync_backend.list_tags_for_resource(resource_arn=resource_arn,)
return 200, {}, json.dumps(dict(tags=tags))
def _extract_arn_from_path(self):
# /v1/tags/arn_that_may_contain_a_slash
path = unquote(self.path)
return "/".join(path.split("/")[3:])
def get_type(self):
api_id = unquote(self.path.split("/")[-3])
type_name = self.path.split("/")[-1]
type_format = self.querystring.get("format")[0]
graphql_type = self.appsync_backend.get_type(
api_id=api_id, type_name=type_name, type_format=type_format,
)
return 200, {}, json.dumps(dict(type=graphql_type))

20
moto/appsync/urls.py Normal file
View File

@ -0,0 +1,20 @@
"""appsync base URL and path."""
from .responses import AppSyncResponse
url_bases = [
r"https?://appsync\.(.+)\.amazonaws\.com",
]
response = AppSyncResponse()
url_paths = {
"{0}/v1/apis$": response.graph_ql,
"{0}/v1/apis/(?P<api_id>[^/]+)$": response.graph_ql_individual,
"{0}/v1/apis/(?P<api_id>[^/]+)/apikeys$": response.api_key,
"{0}/v1/apis/(?P<api_id>[^/]+)/apikeys/(?P<api_key_id>[^/]+)$": response.api_key_individual,
"{0}/v1/apis/(?P<api_id>[^/]+)/schemacreation$": response.schemacreation,
"{0}/v1/tags/(?P<resource_arn>.+)$": response.tags,
"{0}/v1/apis/(?P<api_id>[^/]+)/types/(?P<type_name>.+)$": response.types,
}

View File

@ -8,6 +8,7 @@ backend_url_patterns = [
"applicationautoscaling", "applicationautoscaling",
re.compile("https?://application-autoscaling\\.(.+)\\.amazonaws.com"), re.compile("https?://application-autoscaling\\.(.+)\\.amazonaws.com"),
), ),
("appsync", re.compile("https?://appsync\\.(.+)\\.amazonaws\\.com")),
("athena", re.compile("https?://athena\\.(.+)\\.amazonaws\\.com")), ("athena", re.compile("https?://athena\\.(.+)\\.amazonaws\\.com")),
("autoscaling", re.compile("https?://autoscaling\\.(.+)\\.amazonaws\\.com")), ("autoscaling", re.compile("https?://autoscaling\\.(.+)\\.amazonaws\\.com")),
("batch", re.compile("https?://batch\\.(.+)\\.amazonaws.com")), ("batch", re.compile("https?://batch\\.(.+)\\.amazonaws.com")),

View File

@ -168,4 +168,6 @@ class TaggingService:
@staticmethod @staticmethod
def convert_dict_to_tags_input(tags): def convert_dict_to_tags_input(tags):
""" Given a dictionary, return generic boto params for tags """ """ Given a dictionary, return generic boto params for tags """
if not tags:
return []
return [{"Key": k, "Value": v} for (k, v) in tags.items()] return [{"Key": k, "Value": v} for (k, v) in tags.items()]

View File

@ -49,6 +49,7 @@ _dep_python_jose_ecdsa_pin = (
) )
_dep_dataclasses = "dataclasses; python_version < '3.7'" _dep_dataclasses = "dataclasses; python_version < '3.7'"
_dep_docker = "docker>=2.5.1" _dep_docker = "docker>=2.5.1"
_dep_graphql = "graphql-core"
_dep_jsondiff = "jsondiff>=1.1.2" _dep_jsondiff = "jsondiff>=1.1.2"
_dep_aws_xray_sdk = "aws-xray-sdk!=0.96,>=0.93" _dep_aws_xray_sdk = "aws-xray-sdk!=0.96,>=0.93"
_dep_idna = "idna<4,>=2.5" _dep_idna = "idna<4,>=2.5"
@ -61,6 +62,7 @@ all_extra_deps = [
_dep_python_jose, _dep_python_jose,
_dep_python_jose_ecdsa_pin, _dep_python_jose_ecdsa_pin,
_dep_docker, _dep_docker,
_dep_graphql,
_dep_jsondiff, _dep_jsondiff,
_dep_aws_xray_sdk, _dep_aws_xray_sdk,
_dep_idna, _dep_idna,
@ -80,6 +82,7 @@ for service_name in [
extras_per_service.update( extras_per_service.update(
{ {
"apigateway": [_dep_python_jose, _dep_python_jose_ecdsa_pin], "apigateway": [_dep_python_jose, _dep_python_jose_ecdsa_pin],
"appsync": [_dep_graphql],
"awslambda": [_dep_docker], "awslambda": [_dep_docker],
"batch": [_dep_docker], "batch": [_dep_docker],
"cloudformation": [_dep_docker, _dep_PyYAML, _dep_cfn_lint], "cloudformation": [_dep_docker, _dep_PyYAML, _dep_cfn_lint],

View File

@ -1,5 +1,7 @@
TestAccAWSAccessKey TestAccAWSAccessKey
TestAccAWSAvailabilityZones TestAccAWSAvailabilityZones
TestAccAWSAppsyncApiKey
TestAccAWSAppsyncGraphqlApi
TestAccAWSBillingServiceAccount TestAccAWSBillingServiceAccount
TestAccAWSCallerIdentity TestAccAWSCallerIdentity
TestAccAWSCloudTrailServiceAccount TestAccAWSCloudTrailServiceAccount

View File

View File

@ -0,0 +1,164 @@
import boto3
import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError
from moto import mock_appsync
from moto.core import ACCOUNT_ID
# See our Development Tips on writing tests for hints on how to write good tests:
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
@mock_appsync
def test_create_graphql_api():
client = boto3.client("appsync", region_name="ap-southeast-1")
resp = client.create_graphql_api(name="api1", authenticationType="API_KEY")
resp.should.have.key("graphqlApi")
api = resp["graphqlApi"]
api.should.have.key("name").equals("api1")
api.should.have.key("apiId")
api.should.have.key("authenticationType").equals("API_KEY")
api.should.have.key("arn").equals(
f"arn:aws:appsync:ap-southeast-1:{ACCOUNT_ID}:apis/{api['apiId']}"
)
api.should.have.key("uris").equals({"GRAPHQL": "http://graphql.uri"})
api.should.have.key("xrayEnabled").equals(False)
api.shouldnt.have.key("additionalAuthenticationProviders")
api.shouldnt.have.key("logConfig")
@mock_appsync
def test_create_graphql_api_advanced():
client = boto3.client("appsync", region_name="ap-southeast-1")
resp = client.create_graphql_api(
name="api1",
authenticationType="API_KEY",
additionalAuthenticationProviders=[{"authenticationType": "API_KEY"}],
logConfig={
"fieldLogLevel": "ERROR",
"cloudWatchLogsRoleArn": "arn:aws:cloudwatch:role",
},
xrayEnabled=True,
)
resp.should.have.key("graphqlApi")
api = resp["graphqlApi"]
api.should.have.key("name").equals("api1")
api.should.have.key("apiId")
api.should.have.key("authenticationType").equals("API_KEY")
api.should.have.key("arn").equals(
f"arn:aws:appsync:ap-southeast-1:{ACCOUNT_ID}:apis/{api['apiId']}"
)
api.should.have.key("uris").equals({"GRAPHQL": "http://graphql.uri"})
api.should.have.key("additionalAuthenticationProviders").equals(
[{"authenticationType": "API_KEY"}]
)
api.should.have.key("logConfig").equals(
{"cloudWatchLogsRoleArn": "arn:aws:cloudwatch:role", "fieldLogLevel": "ERROR"}
)
api.should.have.key("xrayEnabled").equals(True)
@mock_appsync
def test_get_graphql_api():
client = boto3.client("appsync", region_name="ap-southeast-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
resp = client.get_graphql_api(apiId=api_id)
resp.should.have.key("graphqlApi")
api = resp["graphqlApi"]
api.should.have.key("name").equals("api1")
api.should.have.key("apiId")
api.should.have.key("authenticationType").equals("API_KEY")
@mock_appsync
def test_update_graphql_api():
client = boto3.client("appsync", region_name="ap-southeast-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
client.update_graphql_api(
apiId=api_id,
name="api2",
authenticationType="AWS_IAM",
logConfig={
"cloudWatchLogsRoleArn": "arn:aws:cloudwatch:role",
"fieldLogLevel": "ERROR",
},
userPoolConfig={
"awsRegion": "us-east-1",
"defaultAction": "DENY",
"userPoolId": "us-east-1_391729ed4a2d430a9d2abadecfc1ab86",
},
xrayEnabled=True,
)
graphql_api = client.get_graphql_api(apiId=api_id)["graphqlApi"]
graphql_api.should.have.key("name").equals("api2")
graphql_api.should.have.key("authenticationType").equals("AWS_IAM")
graphql_api.should.have.key("arn").equals(
f"arn:aws:appsync:ap-southeast-1:{ACCOUNT_ID}:apis/{graphql_api['apiId']}"
)
graphql_api.should.have.key("logConfig").equals(
{"cloudWatchLogsRoleArn": "arn:aws:cloudwatch:role", "fieldLogLevel": "ERROR"}
)
graphql_api.should.have.key("userPoolConfig").equals(
{
"awsRegion": "us-east-1",
"defaultAction": "DENY",
"userPoolId": "us-east-1_391729ed4a2d430a9d2abadecfc1ab86",
}
)
graphql_api.should.have.key("xrayEnabled").equals(True)
@mock_appsync
def test_get_graphql_api_unknown():
client = boto3.client("appsync", region_name="ap-southeast-1")
with pytest.raises(ClientError) as exc:
client.get_graphql_api(apiId="unknown")
err = exc.value.response["Error"]
err["Code"].should.equal("NotFoundException")
err["Message"].should.equal("GraphQL API unknown not found.")
@mock_appsync
def test_delete_graphql_api():
client = boto3.client("appsync", region_name="eu-west-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
resp = client.list_graphql_apis()
resp.should.have.key("graphqlApis").length_of(1)
client.delete_graphql_api(apiId=api_id)
resp = client.list_graphql_apis()
resp.should.have.key("graphqlApis").length_of(0)
@mock_appsync
def test_list_graphql_apis():
client = boto3.client("appsync", region_name="ap-southeast-1")
resp = client.list_graphql_apis()
resp.should.have.key("graphqlApis").equals([])
for _ in range(3):
client.create_graphql_api(name="api1", authenticationType="API_KEY")
resp = client.list_graphql_apis()
resp.should.have.key("graphqlApis").length_of(3)

View File

@ -0,0 +1,112 @@
import boto3
from datetime import timedelta, datetime
from moto import mock_appsync
from moto.core.utils import unix_time
@mock_appsync
def test_create_api_key_simple():
client = boto3.client("appsync", region_name="eu-west-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
resp = client.create_api_key(apiId=api_id)
resp.should.have.key("apiKey")
api_key = resp["apiKey"]
api_key.should.have.key("id")
api_key.shouldnt.have.key("description")
api_key.should.have.key("expires")
api_key.should.have.key("deletes")
@mock_appsync
def test_create_api_key():
client = boto3.client("appsync", region_name="ap-southeast-1")
tomorrow = datetime.now() + timedelta(days=1)
tomorrow_in_secs = int(unix_time(tomorrow))
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
resp = client.create_api_key(
apiId=api_id, description="my first api key", expires=tomorrow_in_secs
)
resp.should.have.key("apiKey")
api_key = resp["apiKey"]
api_key.should.have.key("id")
api_key.should.have.key("description").equals("my first api key")
api_key.should.have.key("expires").equals(tomorrow_in_secs)
api_key.should.have.key("deletes").equals(tomorrow_in_secs)
@mock_appsync
def test_delete_api_key():
client = boto3.client("appsync", region_name="us-east-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
api_key_id = client.create_api_key(apiId=api_id)["apiKey"]["id"]
client.delete_api_key(apiId=api_id, id=api_key_id)
resp = client.list_api_keys(apiId=api_id)
resp.should.have.key("apiKeys").length_of(0)
@mock_appsync
def test_list_api_keys_unknown_api():
client = boto3.client("appsync", region_name="ap-southeast-1")
resp = client.list_api_keys(apiId="unknown")
resp.should.have.key("apiKeys").equals([])
@mock_appsync
def test_list_api_keys_empty():
client = boto3.client("appsync", region_name="ap-southeast-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
resp = client.list_api_keys(apiId=api_id)
resp.should.have.key("apiKeys").equals([])
@mock_appsync
def test_list_api_keys():
client = boto3.client("appsync", region_name="ap-southeast-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
client.create_api_key(apiId=api_id)
client.create_api_key(apiId=api_id, description="my first api key")
resp = client.list_api_keys(apiId=api_id)
resp.should.have.key("apiKeys").length_of(2)
@mock_appsync
def test_update_api_key():
client = boto3.client("appsync", region_name="eu-west-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
original = client.create_api_key(apiId=api_id, description="my first api key")[
"apiKey"
]
updated = client.update_api_key(
apiId=api_id, id=original["id"], description="my second api key"
)["apiKey"]
updated.should.have.key("id").equals(original["id"])
updated.should.have.key("description").equals("my second api key")
updated.should.have.key("expires").equals(original["expires"])
updated.should.have.key("deletes").equals(original["deletes"])

View File

@ -0,0 +1,88 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_appsync
schema = """type Mutation {
putPost(id: ID!, title: String!): Post
}
"My custom post type"
type Post {
id: ID!
title: String!
}
type Query {
singlePost(id: ID!): Post
}
schema {
query: Query
mutation: Mutation
}"""
@mock_appsync
def test_start_schema_creation():
client = boto3.client("appsync", region_name="us-east-2")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
resp = client.start_schema_creation(apiId=api_id, definition=b"sth")
resp.should.have.key("status").equals("PROCESSING")
@mock_appsync
def test_get_schema_creation_status():
client = boto3.client("appsync", region_name="eu-west-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
client.start_schema_creation(apiId=api_id, definition=schema.encode("utf-8"))
resp = client.get_schema_creation_status(apiId=api_id)
resp.should.have.key("status").equals("SUCCESS")
resp.shouldnt.have.key("details")
@mock_appsync
def test_get_schema_creation_status_invalid():
client = boto3.client("appsync", region_name="eu-west-1")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
client.start_schema_creation(apiId=api_id, definition=b"sth")
resp = client.get_schema_creation_status(apiId=api_id)
resp.should.have.key("status").equals("FAILED")
resp.should.have.key("details").match("Syntax Error")
@mock_appsync
def test_get_type_from_schema():
client = boto3.client("appsync", region_name="us-east-2")
api_id = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]["apiId"]
client.start_schema_creation(apiId=api_id, definition=schema.encode("utf-8"))
resp = client.get_type(apiId=api_id, typeName="Post", format="SDL")
resp.should.have.key("type")
graphql_type = resp["type"]
graphql_type.should.have.key("name").equals("Post")
graphql_type.should.have.key("description").equals("My custom post type")
graphql_type.should.have.key("arn").equals("arn:aws:appsync:graphql_type/Post")
graphql_type.should.have.key("definition").equals("NotYetImplemented")
graphql_type.should.have.key("format").equals("SDL")
query_type = client.get_type(apiId=api_id, typeName="Query", format="SDL")["type"]
query_type.should.have.key("name").equals("Query")
query_type.shouldnt.have.key("description")

View File

@ -0,0 +1,86 @@
import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_appsync
@mock_appsync
def test_create_graphql_api_with_tags():
client = boto3.client("appsync", region_name="ap-southeast-1")
api = client.create_graphql_api(
name="api1", authenticationType="API_KEY", tags={"key": "val", "key2": "val2"}
)["graphqlApi"]
api.should.have.key("tags").equals({"key": "val", "key2": "val2"})
api = client.get_graphql_api(apiId=api["apiId"])["graphqlApi"]
api.should.have.key("tags").equals({"key": "val", "key2": "val2"})
@mock_appsync
def test_tag_resource():
client = boto3.client("appsync", region_name="us-east-2")
api = client.create_graphql_api(name="api1", authenticationType="API_KEY")[
"graphqlApi"
]
client.tag_resource(resourceArn=(api["arn"]), tags={"key1": "val1"})
api = client.get_graphql_api(apiId=api["apiId"])["graphqlApi"]
api.should.have.key("tags").equals({"key1": "val1"})
@mock_appsync
def test_tag_resource_with_existing_tags():
client = boto3.client("appsync", region_name="us-east-2")
api = client.create_graphql_api(
name="api1", authenticationType="API_KEY", tags={"key": "val", "key2": "val2"}
)["graphqlApi"]
client.untag_resource(resourceArn=api["arn"], tagKeys=["key"])
client.tag_resource(
resourceArn=(api["arn"]), tags={"key2": "new value", "key3": "val3"}
)
api = client.get_graphql_api(apiId=api["apiId"])["graphqlApi"]
api.should.have.key("tags").equals({"key2": "new value", "key3": "val3"})
@mock_appsync
def test_untag_resource():
client = boto3.client("appsync", region_name="eu-west-1")
api = client.create_graphql_api(
name="api1", authenticationType="API_KEY", tags={"key": "val", "key2": "val2"}
)["graphqlApi"]
client.untag_resource(resourceArn=api["arn"], tagKeys=["key"])
api = client.get_graphql_api(apiId=api["apiId"])["graphqlApi"]
api.should.have.key("tags").equals({"key2": "val2"})
@mock_appsync
def test_untag_resource_all():
client = boto3.client("appsync", region_name="eu-west-1")
api = client.create_graphql_api(
name="api1", authenticationType="API_KEY", tags={"key": "val", "key2": "val2"}
)["graphqlApi"]
client.untag_resource(resourceArn=api["arn"], tagKeys=["key", "key2"])
api = client.get_graphql_api(apiId=api["apiId"])["graphqlApi"]
api.should.have.key("tags").equals({})
@mock_appsync
def test_list_tags_for_resource():
client = boto3.client("appsync", region_name="ap-southeast-1")
api = client.create_graphql_api(
name="api1", authenticationType="API_KEY", tags={"key": "val", "key2": "val2"}
)["graphqlApi"]
resp = client.list_tags_for_resource(resourceArn=api["arn"])
resp.should.have.key("tags").equals({"key": "val", "key2": "val2"})

View File

@ -0,0 +1,15 @@
import json
import sure # noqa # pylint: disable=unused-import
import moto.server as server
def test_appsync_list_tags_for_resource():
backend = server.create_backend_app("appsync")
test_client = backend.test_client()
resp = test_client.get(
"/v1/tags/arn%3Aaws%3Aappsync%3Aus-east-1%3A123456789012%3Aapis%2Ff405dd93-855e-451d-ab00-7325b8e439c6?tagKeys=Description"
)
resp.status_code.should.equal(200)
json.loads(resp.data).should.equals({"tags": {}})