Added more tests and coverage
This commit is contained in:
parent
856c07de63
commit
af57cfc7ec
@ -120,6 +120,16 @@ class ApiKeyAlreadyExists(RESTError):
|
||||
"ConflictException", "API Key already exists"
|
||||
)
|
||||
|
||||
|
||||
class InvalidDomainName(BadRequestException):
|
||||
code = 404
|
||||
|
||||
def __init__(self):
|
||||
super(InvalidDomainName, self).__init__(
|
||||
"BadRequestException", "No Domain Name specified"
|
||||
)
|
||||
|
||||
|
||||
class DomainNameNotFound(RESTError):
|
||||
code = 404
|
||||
|
||||
|
@ -34,7 +34,8 @@ from .exceptions import (
|
||||
NoIntegrationDefined,
|
||||
NoMethodDefined,
|
||||
ApiKeyAlreadyExists,
|
||||
DomainNameNotFound
|
||||
DomainNameNotFound,
|
||||
InvalidDomainName
|
||||
)
|
||||
|
||||
STAGE_URL = "https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}"
|
||||
@ -1047,25 +1048,26 @@ class APIGatewayBackend(BaseBackend):
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def create_domain_name(self, domain_name,
|
||||
certificate_name=None, tags=None,
|
||||
certificate_arn=None, certificate_body=None,
|
||||
certificate_private_key=None,
|
||||
def create_domain_name(self,domain_name,
|
||||
certificate_name=None,certificate_private_key=None,
|
||||
tags=None, certificate_arn=None,
|
||||
certificate_body=None,
|
||||
certificate_chain=None,
|
||||
regional_certificate_name=None,
|
||||
regional_certificate_arn=None,
|
||||
endpoint_configuration=None,
|
||||
security_policy=None,
|
||||
generate_cli_skeleton=None):
|
||||
|
||||
if not domain_name:
|
||||
raise DomainNameNotFound()
|
||||
raise InvalidDomainName()
|
||||
|
||||
new_domain_name = DomainName(
|
||||
domain_name=domain_name,
|
||||
certificate_name=certificate_name,
|
||||
certificate_private_key=certificate_private_key,
|
||||
certificate_arn=certificate_arn,
|
||||
certificate_body=certificate_body,
|
||||
certificate_private_key=certificate_private_key,
|
||||
certificate_chain=certificate_chain,
|
||||
regional_certificate_name=regional_certificate_name,
|
||||
regional_certificate_arn=regional_certificate_arn,
|
||||
@ -1081,7 +1083,11 @@ class APIGatewayBackend(BaseBackend):
|
||||
return list(self.domain_names.values())
|
||||
|
||||
def get_domain_name(self, domain_name):
|
||||
return self.domain_names[domain_name]
|
||||
domain_info = self.domain_names.get(domain_name)
|
||||
if domain_info is None:
|
||||
raise DomainNameNotFound
|
||||
else:
|
||||
return self.domain_names[domain_name]
|
||||
|
||||
|
||||
apigateway_backends = {}
|
||||
|
@ -11,6 +11,8 @@ from .exceptions import (
|
||||
AuthorizerNotFoundException,
|
||||
StageNotFoundException,
|
||||
ApiKeyAlreadyExists,
|
||||
DomainNameNotFound,
|
||||
InvalidDomainName
|
||||
)
|
||||
|
||||
API_KEY_SOURCES = ["AUTHORIZER", "HEADER"]
|
||||
@ -561,17 +563,22 @@ class APIGatewayResponse(BaseResponse):
|
||||
"generateCliSkeleton"
|
||||
)
|
||||
domain_name_resp = self.backend.create_domain_name(
|
||||
domain_name, certificate_name, tags, certificate_arn,
|
||||
certificate_body, certificate_private_key,
|
||||
certificate_chain, regional_certificate_name,
|
||||
regional_certificate_arn, endpoint_configuration,
|
||||
security_policy, generate_cli_skeleton
|
||||
domain_name, certificate_name,
|
||||
certificate_private_key,tags, certificate_arn,
|
||||
certificate_body, certificate_chain,
|
||||
regional_certificate_name, regional_certificate_arn,
|
||||
endpoint_configuration, security_policy,
|
||||
generate_cli_skeleton
|
||||
)
|
||||
|
||||
return 200, {}, json.dumps(domain_name_resp)
|
||||
except BadRequestException as e:
|
||||
return self.error(
|
||||
"com.amazonaws.dynamodb.v20111205#BadRequestException", e.message
|
||||
|
||||
except InvalidDomainName as error:
|
||||
return (
|
||||
error.code,
|
||||
{},
|
||||
'{{"message":"{0}","code":"{1}"}}'.format(
|
||||
error.message, error.error_type
|
||||
),
|
||||
)
|
||||
|
||||
def domain_name_induvidual(self, request, full_url, headers):
|
||||
@ -580,9 +587,19 @@ class APIGatewayResponse(BaseResponse):
|
||||
url_path_parts = self.path.split("/")
|
||||
domain_name = url_path_parts[2]
|
||||
domain_names={}
|
||||
try:
|
||||
if self.method == "GET":
|
||||
if domain_name is not None:
|
||||
domain_names = self.backend.get_domain_name(domain_name)
|
||||
return 200, {}, json.dumps(domain_names)
|
||||
|
||||
except DomainNameNotFound as error:
|
||||
return (
|
||||
error.code,
|
||||
{},
|
||||
'{{"message":"{0}","code":"{1}"}}'.format(
|
||||
error.message, error.error_type
|
||||
),
|
||||
)
|
||||
|
||||
if self.method == "GET":
|
||||
if domain_name is not None:
|
||||
domain_names = self.backend.get_domain_name(domain_name)
|
||||
|
||||
return 200, {}, json.dumps(domain_names)
|
||||
|
@ -1489,15 +1489,62 @@ def test_create_domain_names():
|
||||
domain_name = "testDomain"
|
||||
test_certificate_name = "test.certificate"
|
||||
test_certificate_private_key = "testPrivateKey"
|
||||
response = client.create_domain_name(domainName=domain_name, certificateName=test_certificate_name,
|
||||
certificatePrivateKey=test_certificate_private_key)
|
||||
# success case with valid params
|
||||
response = client.create_domain_name(domainName=domain_name,
|
||||
certificateName=test_certificate_name,
|
||||
certificatePrivateKey=test_certificate_private_key)
|
||||
|
||||
response["domainName"].should.equal(domain_name)
|
||||
response["certificateName"].should.equal(test_certificate_name)
|
||||
# without domain name it should throw BadRequestException
|
||||
with assert_raises(ClientError) as ex:
|
||||
client.create_domain_name(domainName="")
|
||||
|
||||
ex.exception.response["Error"]["Message"].should.equal(
|
||||
"No Domain Name specified")
|
||||
ex.exception.response["Error"]["Code"].should.equal(
|
||||
"BadRequestException")
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_get_domain_names():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
# without any domain names already present
|
||||
result = client.get_domain_names()
|
||||
result["items"].should.equal([])
|
||||
domain_name = "testDomain"
|
||||
test_certificate_name = "test.certificate"
|
||||
response = client.create_domain_name(domainName=domain_name,
|
||||
certificateName=test_certificate_name)
|
||||
|
||||
response["domainName"].should.equal(domain_name)
|
||||
response["certificateName"].should.equal(test_certificate_name)
|
||||
response["domainNameStatus"].should.equal("AVAILABLE")
|
||||
# after adding a new domain name
|
||||
result = client.get_domain_names()
|
||||
result["items"][0]["domainName"].should.equal(domain_name)
|
||||
result["items"][0]["certificateName"].should.equal(test_certificate_name)
|
||||
result["items"][0]["domainNameStatus"].should.equal("AVAILABLE")
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
def test_get_domain_name():
|
||||
client = boto3.client("apigateway", region_name="us-west-2")
|
||||
domain_name = "testDomain"
|
||||
# quering an invalid domain name which is not present
|
||||
with assert_raises(ClientError) as ex:
|
||||
client.get_domain_name(domainName=domain_name)
|
||||
|
||||
ex.exception.response["Error"]["Message"].should.equal(
|
||||
"Invalid Domain Name specified")
|
||||
ex.exception.response["Error"]["Code"].should.equal(
|
||||
"NotFoundException")
|
||||
# adding a domain name
|
||||
client.create_domain_name(domainName=domain_name)
|
||||
# retrieving the data of added domain name.
|
||||
result = client.get_domain_name(domainName=domain_name)
|
||||
result["domainName"].should.equal(domain_name)
|
||||
result["domainNameStatus"].should.equal("AVAILABLE")
|
||||
|
||||
|
||||
@mock_apigateway
|
||||
|
Loading…
Reference in New Issue
Block a user