aws apigateway create,get domain names

This commit is contained in:
usmankb 2020-04-08 03:18:42 +05:30
parent 452e63977e
commit 856c07de63
5 changed files with 169 additions and 1 deletions

View File

@ -119,3 +119,11 @@ class ApiKeyAlreadyExists(RESTError):
super(ApiKeyAlreadyExists, self).__init__(
"ConflictException", "API Key already exists"
)
class DomainNameNotFound(RESTError):
code = 404
def __init__(self):
super(DomainNameNotFound, self).__init__(
"NotFoundException", "Invalid Domain Name specified"
)

View File

@ -34,6 +34,7 @@ from .exceptions import (
NoIntegrationDefined,
NoMethodDefined,
ApiKeyAlreadyExists,
DomainNameNotFound
)
STAGE_URL = "https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}"
@ -463,7 +464,6 @@ class RestAPI(BaseModel):
self.deployments = {}
self.authorizers = {}
self.stages = {}
self.resources = {}
self.add_child("/") # Add default child
@ -609,6 +609,51 @@ class RestAPI(BaseModel):
return self.deployments.pop(deployment_id)
class DomainName(BaseModel,dict):
def __init__(self, domain_name, **kwargs):
super(DomainName, self).__init__()
self["domainName"] = domain_name
self["regionalDomainName"] = domain_name,
self["distributionDomainName"] = domain_name,
self["domainNameStatus"] = "AVAILABLE"
self["domainNameStatusMessage"] = "Domain Name Available"
self["regionalHostedZoneId"] = "Z2FDTNDATAQYW2"
self["distributionHostedZoneId"] = "Z2FDTNDATAQYW2"
self["certificateUploadDate"] = int(time.time())
if kwargs.get("certificate_name"):
self["certificateName"] = kwargs.get("certificate_name")
if kwargs.get("certificate_arn"):
self["certificateArn"] = kwargs.get("certificate_arn")
if kwargs.get("certificate_body"):
self["certificateBody"] = kwargs.get("certificate_body")
if kwargs.get("tags"):
self["tags"] = kwargs.get("tags" )
if kwargs.get("security_policy"):
self["securityPolicy"] = kwargs.get("security_policy")
if kwargs.get("certificate_chain"):
self["certificateChain"] = kwargs.get("certificate_chain")
if kwargs.get("regional_certificate_name"):
self["regionalCertificateName"] = kwargs.get(
"regional_certificate_name"
)
if kwargs.get("certificate_private_key"):
self["certificatePrivateKey"] = kwargs.get(
"certificate_private_key"
)
if kwargs.get("regional_certificate_arn"):
self["regionalCertificateArn"] = kwargs.get(
"regional_certificate_arn"
)
if kwargs.get("endpoint_configuration"):
self["endpointConfiguration"] = kwargs.get(
"endpoint_configuration"
)
if kwargs.get("generate_cli_skeleton"):
self["generateCliSkeleton"] = kwargs.get(
"generate_cli_skeleton"
)
class APIGatewayBackend(BaseBackend):
def __init__(self, region_name):
super(APIGatewayBackend, self).__init__()
@ -616,6 +661,7 @@ class APIGatewayBackend(BaseBackend):
self.keys = {}
self.usage_plans = {}
self.usage_plan_keys = {}
self.domain_names = {}
self.region_name = region_name
def reset(self):
@ -1001,6 +1047,42 @@ class APIGatewayBackend(BaseBackend):
except Exception:
return False
def create_domain_name(self, domain_name,
certificate_name=None, tags=None,
certificate_arn=None, certificate_body=None,
certificate_private_key=None,
certificate_chain=None,
regional_certificate_name=None,
regional_certificate_arn=None,
endpoint_configuration=None,
security_policy=None,
generate_cli_skeleton=None):
if not domain_name:
raise DomainNameNotFound()
new_domain_name = DomainName(
domain_name=domain_name,
certificate_name=certificate_name,
certificate_arn=certificate_arn,
certificate_body=certificate_body,
certificate_private_key=certificate_private_key,
certificate_chain=certificate_chain,
regional_certificate_name=regional_certificate_name,
regional_certificate_arn=regional_certificate_arn,
endpoint_configuration=endpoint_configuration,
tags=tags, security_policy=security_policy,
generate_cli_skeleton=generate_cli_skeleton,
)
self.domain_names[domain_name] = new_domain_name
return new_domain_name
def get_domain_names(self):
return list(self.domain_names.values())
def get_domain_name(self, domain_name):
return self.domain_names[domain_name]
apigateway_backends = {}
for region_name in Session().get_available_regions("apigateway"):

View File

@ -527,3 +527,62 @@ class APIGatewayResponse(BaseResponse):
usage_plan_id, key_id
)
return 200, {}, json.dumps(usage_plan_response)
def domain_names(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
try:
if self.method == "GET":
domain_names = self.backend.get_domain_names()
return 200, {}, json.dumps({"item": domain_names})
elif self.method == "POST":
domain_name = self._get_param("domainName")
certificate_name = self._get_param("certificateName")
tags = self._get_param("tags")
certificate_arn = self._get_param("certificateArn")
certificate_body = self._get_param("certificateBody")
certificate_private_key = self._get_param(
"certificatePrivateKey"
)
certificate_chain = self._get_param("certificateChain")
regional_certificate_name = self._get_param(
"regionalCertificateName"
)
regional_certificate_arn = self._get_param(
"regionalCertificateArn"
)
endpoint_configuration = self._get_param(
"endpointConfiguration"
)
security_policy = self._get_param("securityPolicy")
generate_cli_skeleton = self._get_param(
"generateCliSkeleton"
)
domain_name_resp = self.backend.create_domain_name(
domain_name, certificate_name, tags, certificate_arn,
certificate_body, certificate_private_key,
certificate_chain, regional_certificate_name,
regional_certificate_arn, endpoint_configuration,
security_policy, generate_cli_skeleton
)
return 200, {}, json.dumps(domain_name_resp)
except BadRequestException as e:
return self.error(
"com.amazonaws.dynamodb.v20111205#BadRequestException", e.message
)
def domain_name_induvidual(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
url_path_parts = self.path.split("/")
domain_name = url_path_parts[2]
domain_names={}
if self.method == "GET":
if domain_name is not None:
domain_names = self.backend.get_domain_name(domain_name)
return 200, {}, json.dumps(domain_names)

View File

@ -21,6 +21,8 @@ url_paths = {
"{0}/apikeys$": APIGatewayResponse().apikeys,
"{0}/apikeys/(?P<apikey>[^/]+)": APIGatewayResponse().apikey_individual,
"{0}/usageplans$": APIGatewayResponse().usage_plans,
"{0}/domainnames$": APIGatewayResponse().domain_names,
"{0}/domainnames/(?P<domain_name>[^/]+)/?$": APIGatewayResponse().domain_name_induvidual,
"{0}/usageplans/(?P<usage_plan_id>[^/]+)/?$": APIGatewayResponse().usage_plan_individual,
"{0}/usageplans/(?P<usage_plan_id>[^/]+)/keys$": APIGatewayResponse().usage_plan_keys,
"{0}/usageplans/(?P<usage_plan_id>[^/]+)/keys/(?P<api_key_id>[^/]+)/?$": APIGatewayResponse().usage_plan_key_individual,

View File

@ -1483,6 +1483,23 @@ def test_deployment():
stage["description"].should.equal("_new_description_")
@mock_apigateway
def test_create_domain_names():
client = boto3.client("apigateway", region_name="us-west-2")
domain_name = "testDomain"
test_certificate_name = "test.certificate"
test_certificate_private_key = "testPrivateKey"
response = client.create_domain_name(domainName=domain_name, certificateName=test_certificate_name,
certificatePrivateKey=test_certificate_private_key)
response["domainName"].should.equal(domain_name)
response["certificateName"].should.equal(test_certificate_name)
result = client.get_domain_names()
result["items"][0]["domainName"].should.equal(domain_name)
result = client.get_domain_name(domainName=domain_name)
result["domainName"].should.equal(domain_name)
@mock_apigateway
def test_http_proxying_integration():
responses.add(