diff --git a/moto/apigateway/exceptions.py b/moto/apigateway/exceptions.py index ccb870f52..c9c90cea5 100644 --- a/moto/apigateway/exceptions.py +++ b/moto/apigateway/exceptions.py @@ -119,3 +119,21 @@ class ApiKeyAlreadyExists(RESTError): super(ApiKeyAlreadyExists, self).__init__( "ConflictException", "API Key already exists" ) + + +class InvalidDomainName(BadRequestException): + code = 404 + + def __init__(self): + super(InvalidDomainName, self).__init__( + "BadRequestException", "No Domain Name specified" + ) + + +class DomainNameNotFound(RESTError): + code = 404 + + def __init__(self): + super(DomainNameNotFound, self).__init__( + "NotFoundException", "Invalid Domain Name specified" + ) diff --git a/moto/apigateway/models.py b/moto/apigateway/models.py index 5b02e6204..16462e278 100644 --- a/moto/apigateway/models.py +++ b/moto/apigateway/models.py @@ -34,6 +34,8 @@ from .exceptions import ( NoIntegrationDefined, NoMethodDefined, ApiKeyAlreadyExists, + DomainNameNotFound, + InvalidDomainName, ) STAGE_URL = "https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}" @@ -463,7 +465,6 @@ class RestAPI(BaseModel): self.deployments = {} self.authorizers = {} self.stages = {} - self.resources = {} self.add_child("/") # Add default child @@ -609,6 +610,41 @@ class RestAPI(BaseModel): return self.deployments.pop(deployment_id) +class DomainName(BaseModel, dict): + def __init__(self, domain_name, **kwargs): + super(DomainName, self).__init__() + self["domainName"] = domain_name + self["regionalDomainName"] = domain_name + self["distributionDomainName"] = domain_name + self["domainNameStatus"] = "AVAILABLE" + self["domainNameStatusMessage"] = "Domain Name Available" + self["regionalHostedZoneId"] = "Z2FDTNDATAQYW2" + self["distributionHostedZoneId"] = "Z2FDTNDATAQYW2" + self["certificateUploadDate"] = int(time.time()) + if kwargs.get("certificate_name"): + self["certificateName"] = kwargs.get("certificate_name") + if kwargs.get("certificate_arn"): + self["certificateArn"] = kwargs.get("certificate_arn") + if kwargs.get("certificate_body"): + self["certificateBody"] = kwargs.get("certificate_body") + if kwargs.get("tags"): + self["tags"] = kwargs.get("tags") + if kwargs.get("security_policy"): + self["securityPolicy"] = kwargs.get("security_policy") + if kwargs.get("certificate_chain"): + self["certificateChain"] = kwargs.get("certificate_chain") + if kwargs.get("regional_certificate_name"): + self["regionalCertificateName"] = kwargs.get("regional_certificate_name") + if kwargs.get("certificate_private_key"): + self["certificatePrivateKey"] = kwargs.get("certificate_private_key") + if kwargs.get("regional_certificate_arn"): + self["regionalCertificateArn"] = kwargs.get("regional_certificate_arn") + if kwargs.get("endpoint_configuration"): + self["endpointConfiguration"] = kwargs.get("endpoint_configuration") + if kwargs.get("generate_cli_skeleton"): + self["generateCliSkeleton"] = kwargs.get("generate_cli_skeleton") + + class APIGatewayBackend(BaseBackend): def __init__(self, region_name): super(APIGatewayBackend, self).__init__() @@ -616,6 +652,7 @@ class APIGatewayBackend(BaseBackend): self.keys = {} self.usage_plans = {} self.usage_plan_keys = {} + self.domain_names = {} self.region_name = region_name def reset(self): @@ -1001,6 +1038,53 @@ class APIGatewayBackend(BaseBackend): except Exception: return False + def create_domain_name( + self, + domain_name, + certificate_name=None, + tags=None, + certificate_arn=None, + certificate_body=None, + certificate_private_key=None, + certificate_chain=None, + regional_certificate_name=None, + regional_certificate_arn=None, + endpoint_configuration=None, + security_policy=None, + generate_cli_skeleton=None, + ): + + if not domain_name: + raise InvalidDomainName() + + new_domain_name = DomainName( + domain_name=domain_name, + certificate_name=certificate_name, + certificate_private_key=certificate_private_key, + certificate_arn=certificate_arn, + certificate_body=certificate_body, + certificate_chain=certificate_chain, + regional_certificate_name=regional_certificate_name, + regional_certificate_arn=regional_certificate_arn, + endpoint_configuration=endpoint_configuration, + tags=tags, + security_policy=security_policy, + generate_cli_skeleton=generate_cli_skeleton, + ) + + self.domain_names[domain_name] = new_domain_name + return new_domain_name + + def get_domain_names(self): + return list(self.domain_names.values()) + + def get_domain_name(self, domain_name): + domain_info = self.domain_names.get(domain_name) + if domain_info is None: + raise DomainNameNotFound + else: + return self.domain_names[domain_name] + apigateway_backends = {} for region_name in Session().get_available_regions("apigateway"): diff --git a/moto/apigateway/responses.py b/moto/apigateway/responses.py index f0ed6adc9..e4723f0d4 100644 --- a/moto/apigateway/responses.py +++ b/moto/apigateway/responses.py @@ -11,6 +11,8 @@ from .exceptions import ( AuthorizerNotFoundException, StageNotFoundException, ApiKeyAlreadyExists, + DomainNameNotFound, + InvalidDomainName, ) API_KEY_SOURCES = ["AUTHORIZER", "HEADER"] @@ -527,3 +529,69 @@ class APIGatewayResponse(BaseResponse): usage_plan_id, key_id ) return 200, {}, json.dumps(usage_plan_response) + + def domain_names(self, request, full_url, headers): + self.setup_class(request, full_url, headers) + + try: + if self.method == "GET": + domain_names = self.backend.get_domain_names() + return 200, {}, json.dumps({"item": domain_names}) + + elif self.method == "POST": + domain_name = self._get_param("domainName") + certificate_name = self._get_param("certificateName") + tags = self._get_param("tags") + certificate_arn = self._get_param("certificateArn") + certificate_body = self._get_param("certificateBody") + certificate_private_key = self._get_param("certificatePrivateKey") + certificate_chain = self._get_param("certificateChain") + regional_certificate_name = self._get_param("regionalCertificateName") + regional_certificate_arn = self._get_param("regionalCertificateArn") + endpoint_configuration = self._get_param("endpointConfiguration") + security_policy = self._get_param("securityPolicy") + generate_cli_skeleton = self._get_param("generateCliSkeleton") + domain_name_resp = self.backend.create_domain_name( + domain_name, + certificate_name, + tags, + certificate_arn, + certificate_body, + certificate_private_key, + certificate_chain, + regional_certificate_name, + regional_certificate_arn, + endpoint_configuration, + security_policy, + generate_cli_skeleton, + ) + return 200, {}, json.dumps(domain_name_resp) + + except InvalidDomainName as error: + return ( + error.code, + {}, + '{{"message":"{0}","code":"{1}"}}'.format( + error.message, error.error_type + ), + ) + + def domain_name_induvidual(self, request, full_url, headers): + self.setup_class(request, full_url, headers) + + url_path_parts = self.path.split("/") + domain_name = url_path_parts[2] + domain_names = {} + try: + if self.method == "GET": + if domain_name is not None: + domain_names = self.backend.get_domain_name(domain_name) + return 200, {}, json.dumps(domain_names) + except DomainNameNotFound as error: + return ( + error.code, + {}, + '{{"message":"{0}","code":"{1}"}}'.format( + error.message, error.error_type + ), + ) diff --git a/moto/apigateway/urls.py b/moto/apigateway/urls.py index 4ef6ae72b..6c3b7f6bb 100644 --- a/moto/apigateway/urls.py +++ b/moto/apigateway/urls.py @@ -21,6 +21,8 @@ url_paths = { "{0}/apikeys$": APIGatewayResponse().apikeys, "{0}/apikeys/(?P[^/]+)": APIGatewayResponse().apikey_individual, "{0}/usageplans$": APIGatewayResponse().usage_plans, + "{0}/domainnames$": APIGatewayResponse().domain_names, + "{0}/domainnames/(?P[^/]+)/?$": APIGatewayResponse().domain_name_induvidual, "{0}/usageplans/(?P[^/]+)/?$": APIGatewayResponse().usage_plan_individual, "{0}/usageplans/(?P[^/]+)/keys$": APIGatewayResponse().usage_plan_keys, "{0}/usageplans/(?P[^/]+)/keys/(?P[^/]+)/?$": APIGatewayResponse().usage_plan_key_individual, diff --git a/tests/test_apigateway/test_apigateway.py b/tests/test_apigateway/test_apigateway.py index 0952f2674..a1a380974 100644 --- a/tests/test_apigateway/test_apigateway.py +++ b/tests/test_apigateway/test_apigateway.py @@ -1483,6 +1483,70 @@ def test_deployment(): stage["description"].should.equal("_new_description_") +@mock_apigateway +def test_create_domain_names(): + client = boto3.client("apigateway", region_name="us-west-2") + domain_name = "testDomain" + test_certificate_name = "test.certificate" + test_certificate_private_key = "testPrivateKey" + # success case with valid params + response = client.create_domain_name( + domainName=domain_name, + certificateName=test_certificate_name, + certificatePrivateKey=test_certificate_private_key, + ) + response["domainName"].should.equal(domain_name) + response["certificateName"].should.equal(test_certificate_name) + # without domain name it should throw BadRequestException + with assert_raises(ClientError) as ex: + client.create_domain_name(domainName="") + + ex.exception.response["Error"]["Message"].should.equal("No Domain Name specified") + ex.exception.response["Error"]["Code"].should.equal("BadRequestException") + + +@mock_apigateway +def test_get_domain_names(): + client = boto3.client("apigateway", region_name="us-west-2") + # without any domain names already present + result = client.get_domain_names() + result["items"].should.equal([]) + domain_name = "testDomain" + test_certificate_name = "test.certificate" + response = client.create_domain_name( + domainName=domain_name, certificateName=test_certificate_name + ) + + response["domainName"].should.equal(domain_name) + response["certificateName"].should.equal(test_certificate_name) + response["domainNameStatus"].should.equal("AVAILABLE") + # after adding a new domain name + result = client.get_domain_names() + result["items"][0]["domainName"].should.equal(domain_name) + result["items"][0]["certificateName"].should.equal(test_certificate_name) + result["items"][0]["domainNameStatus"].should.equal("AVAILABLE") + + +@mock_apigateway +def test_get_domain_name(): + client = boto3.client("apigateway", region_name="us-west-2") + domain_name = "testDomain" + # quering an invalid domain name which is not present + with assert_raises(ClientError) as ex: + client.get_domain_name(domainName=domain_name) + + ex.exception.response["Error"]["Message"].should.equal( + "Invalid Domain Name specified" + ) + ex.exception.response["Error"]["Code"].should.equal("NotFoundException") + # adding a domain name + client.create_domain_name(domainName=domain_name) + # retrieving the data of added domain name. + result = client.get_domain_name(domainName=domain_name) + result["domainName"].should.equal(domain_name) + result["domainNameStatus"].should.equal("AVAILABLE") + + @mock_apigateway def test_http_proxying_integration(): responses.add(