Techdebt: Replace sure with regular asserts in APIGateway tests (#6366)

This commit is contained in:
Bert Blommers 2023-06-07 09:46:18 +00:00 committed by GitHub
parent b180f0a015
commit 5e89ceee69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 831 additions and 908 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,5 @@
import boto3 import boto3
import json import json
import sure # noqa # pylint: disable=unused-import
from moto import mock_lambda, mock_cloudformation, mock_apigateway, mock_iam, mock_logs from moto import mock_lambda, mock_cloudformation, mock_apigateway, mock_iam, mock_logs
@ -340,24 +339,24 @@ def test_simple_apigateway_with_lambda_proxy():
# #
# Verify Rest API was created # Verify Rest API was created
api = apigw.get_rest_apis()["items"][0] api = apigw.get_rest_apis()["items"][0]
api["id"].should.equal(api_id) assert api["id"] == api_id
api["name"].should.equal("dev-timeseries-service") assert api["name"] == "dev-timeseries-service"
# #
# Verify Gateway Resource was created # Verify Gateway Resource was created
paths = apigw.get_resources(restApiId=api_id)["items"] paths = apigw.get_resources(restApiId=api_id)["items"]
root_path = [p for p in paths if p["path"] == "/"][0] root_path = [p for p in paths if p["path"] == "/"][0]
hello_path = [p for p in paths if p["path"] == "/hello"][0] hello_path = [p for p in paths if p["path"] == "/hello"][0]
hello_path["parentId"].should.equal(root_path["id"]) assert hello_path["parentId"] == root_path["id"]
# #
# Verify Gateway Method was created # Verify Gateway Method was created
m = apigw.get_method( m = apigw.get_method(
restApiId=api_id, resourceId=hello_path["id"], httpMethod="GET" restApiId=api_id, resourceId=hello_path["id"], httpMethod="GET"
) )
m["httpMethod"].should.equal("GET") assert m["httpMethod"] == "GET"
# #
# Verify a Gateway Deployment was created # Verify a Gateway Deployment was created
d = apigw.get_deployments(restApiId=api_id)["items"] d = apigw.get_deployments(restApiId=api_id)["items"]
d.should.have.length_of(1) assert len(d) == 1
# #
# Verify Lambda function was created # Verify Lambda function was created
awslambda.get_function(FunctionName=fn_name) # Will throw 404 if it doesn't exist awslambda.get_function(FunctionName=fn_name) # Will throw 404 if it doesn't exist
@ -365,9 +364,13 @@ def test_simple_apigateway_with_lambda_proxy():
# Verify Lambda Permission was created # Verify Lambda Permission was created
policy = json.loads(awslambda.get_policy(FunctionName=fn_name)["Policy"]) policy = json.loads(awslambda.get_policy(FunctionName=fn_name)["Policy"])
statement = policy["Statement"][0] statement = policy["Statement"][0]
statement["FunctionName"].should.contain(fn_name) assert (
statement["Condition"]["ArnLike"]["AWS:SourceArn"].should.equal( statement["FunctionName"]
f"arn:aws:execute-api:us-east-1:123456789012:{api_id}/*/*" == f"arn:aws:lambda:us-east-1:123456789012:function:{fn_name}"
)
assert (
statement["Condition"]["ArnLike"]["AWS:SourceArn"]
== f"arn:aws:execute-api:us-east-1:123456789012:{api_id}/*/*"
) )
@ -380,4 +383,4 @@ def test_apigateway_with_unknown_description():
cf.create_stack(StackName="teststack", TemplateBody=template_with_missing_sub) cf.create_stack(StackName="teststack", TemplateBody=template_with_missing_sub)
api = apigw.get_rest_apis()["items"][0] api = apigw.get_rest_apis()["items"][0]
api.should.have.key("description").equals("${UnknownResource}") assert api["description"] == "${UnknownResource}"

View File

@ -1,5 +1,4 @@
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from moto import mock_apigateway from moto import mock_apigateway
@ -17,9 +16,10 @@ def test_create_deployment_requires_REST_methods():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.create_deployment(restApiId=api_id, stageName=stage_name)["id"] client.create_deployment(restApiId=api_id, stageName=stage_name)["id"]
ex.value.response["Error"]["Code"].should.equal("BadRequestException") assert ex.value.response["Error"]["Code"] == "BadRequestException"
ex.value.response["Error"]["Message"].should.equal( assert (
"The REST API doesn't contain any methods" ex.value.response["Error"]["Message"]
== "The REST API doesn't contain any methods"
) )
@ -38,10 +38,8 @@ def test_create_deployment_requires_REST_method_integrations():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.create_deployment(restApiId=api_id, stageName=stage_name)["id"] client.create_deployment(restApiId=api_id, stageName=stage_name)["id"]
ex.value.response["Error"]["Code"].should.equal("NotFoundException") assert ex.value.response["Error"]["Code"] == "NotFoundException"
ex.value.response["Error"]["Message"].should.equal( assert ex.value.response["Error"]["Message"] == "No integration defined for method"
"No integration defined for method"
)
@mock_apigateway @mock_apigateway
@ -53,7 +51,7 @@ def test_create_simple_deployment_with_get_method():
create_method_integration(client, api_id) create_method_integration(client, api_id)
deployment = client.create_deployment(restApiId=api_id, stageName=stage_name) deployment = client.create_deployment(restApiId=api_id, stageName=stage_name)
deployment.should.have.key("id") assert "id" in deployment
@mock_apigateway @mock_apigateway
@ -65,7 +63,7 @@ def test_create_simple_deployment_with_post_method():
create_method_integration(client, api_id, httpMethod="POST") create_method_integration(client, api_id, httpMethod="POST")
deployment = client.create_deployment(restApiId=api_id, stageName=stage_name) deployment = client.create_deployment(restApiId=api_id, stageName=stage_name)
deployment.should.have.key("id") assert "id" in deployment
@mock_apigateway @mock_apigateway
@ -80,10 +78,8 @@ def test_create_deployment_minimal():
deployment_id = response["id"] deployment_id = response["id"]
response = client.get_deployment(restApiId=api_id, deploymentId=deployment_id) response = client.get_deployment(restApiId=api_id, deploymentId=deployment_id)
response.should.have.key("id").equals(deployment_id) assert response["id"] == deployment_id
response.should.have.key("ResponseMetadata").should.have.key( assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
"HTTPStatusCode"
).equals(200)
@mock_apigateway @mock_apigateway
@ -97,13 +93,13 @@ def test_create_deployment_with_empty_stage():
deployment_id = response["id"] deployment_id = response["id"]
response = client.get_deployment(restApiId=api_id, deploymentId=deployment_id) response = client.get_deployment(restApiId=api_id, deploymentId=deployment_id)
response.should.have.key("id") assert "id" in response
response.should.have.key("createdDate") assert "createdDate" in response
response.shouldnt.have.key("stageName") assert "stageName" not in response
# This should not create an empty stage # This should not create an empty stage
stages = client.get_stages(restApiId=api_id)["item"] stages = client.get_stages(restApiId=api_id)["item"]
stages.should.equal([]) assert stages == []
@mock_apigateway @mock_apigateway
@ -118,10 +114,10 @@ def test_get_deployments():
deployment_id = response["id"] deployment_id = response["id"]
response = client.get_deployments(restApiId=api_id) response = client.get_deployments(restApiId=api_id)
response.should.have.key("items").length_of(1) assert len(response["items"]) == 1
response["items"][0].pop("createdDate") response["items"][0].pop("createdDate")
response["items"].should.equal([{"id": deployment_id}]) assert response["items"] == [{"id": deployment_id}]
@mock_apigateway @mock_apigateway
@ -143,8 +139,8 @@ def test_create_multiple_deployments():
response = client.get_deployments(restApiId=api_id) response = client.get_deployments(restApiId=api_id)
response["items"][0]["id"].should.match(rf"{deployment_id2}|{deployment_id}") assert response["items"][0]["id"] in [deployment_id, deployment_id2]
response["items"][1]["id"].should.match(rf"{deployment_id2}|{deployment_id}") assert response["items"][1]["id"] in [deployment_id, deployment_id2]
@mock_apigateway @mock_apigateway
@ -162,38 +158,39 @@ def test_delete_deployment__requires_stage_to_be_deleted():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.delete_deployment(restApiId=api_id, deploymentId=deployment_id) client.delete_deployment(restApiId=api_id, deploymentId=deployment_id)
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("BadRequestException") assert err["Code"] == "BadRequestException"
err["Message"].should.equal( assert (
"Active stages pointing to this deployment must be moved or deleted" err["Message"]
== "Active stages pointing to this deployment must be moved or deleted"
) )
# Deployment still exists # Deployment still exists
deployments = client.get_deployments(restApiId=api_id)["items"] deployments = client.get_deployments(restApiId=api_id)["items"]
deployments.should.have.length_of(1) assert len(deployments) == 1
# Stage still exists # Stage still exists
stages = client.get_stages(restApiId=api_id)["item"] stages = client.get_stages(restApiId=api_id)["item"]
stages.should.have.length_of(1) assert len(stages) == 1
# Delete stage first # Delete stage first
resp = client.delete_stage(restApiId=api_id, stageName=stage_name) resp = client.delete_stage(restApiId=api_id, stageName=stage_name)
resp["ResponseMetadata"].should.have.key("HTTPStatusCode").equals(202) assert resp["ResponseMetadata"]["HTTPStatusCode"] == 202
# Deployment still exists # Deployment still exists
deployments = client.get_deployments(restApiId=api_id)["items"] deployments = client.get_deployments(restApiId=api_id)["items"]
deployments.should.have.length_of(1) assert len(deployments) == 1
# Now delete deployment # Now delete deployment
resp = client.delete_deployment(restApiId=api_id, deploymentId=deployment_id) resp = client.delete_deployment(restApiId=api_id, deploymentId=deployment_id)
resp["ResponseMetadata"].should.have.key("HTTPStatusCode").equals(202) assert resp["ResponseMetadata"]["HTTPStatusCode"] == 202
# Deployment is gone # Deployment is gone
deployments = client.get_deployments(restApiId=api_id)["items"] deployments = client.get_deployments(restApiId=api_id)["items"]
deployments.should.have.length_of(0) assert len(deployments) == 0
# Stage is gone # Stage is gone
stages = client.get_stages(restApiId=api_id)["item"] stages = client.get_stages(restApiId=api_id)["item"]
stages.should.have.length_of(0) assert len(stages) == 0
@mock_apigateway @mock_apigateway
@ -206,5 +203,5 @@ def test_delete_unknown_deployment():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.delete_deployment(restApiId=api_id, deploymentId="unknown") client.delete_deployment(restApiId=api_id, deploymentId="unknown")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NotFoundException") assert err["Code"] == "NotFoundException"
err["Message"].should.equal("Invalid Deployment identifier specified") assert err["Message"] == "Invalid Deployment identifier specified"

View File

@ -31,16 +31,16 @@ def test_import_rest_api__api_is_created():
client.create_stage(restApiId=api_id, stageName="stg", deploymentId=deployment_id) client.create_stage(restApiId=api_id, stageName="stg", deploymentId=deployment_id)
resp = client.get_export(restApiId=api_id, stageName="stg", exportType="swagger") resp = client.get_export(restApiId=api_id, stageName="stg", exportType="swagger")
resp.should.have.key("contentType").equals("application/octet-stream") assert resp["contentType"] == "application/octet-stream"
resp.should.have.key("contentDisposition").match('attachment; filename="swagger') assert 'attachment; filename="swagger' in resp["contentDisposition"]
body = json.loads(resp["body"].read().decode("utf-8")) body = json.loads(resp["body"].read().decode("utf-8"))
body["info"].should.have.key("title").equals("Swagger Petstore - OpenAPI 3.0") assert body["info"]["title"] == "Swagger Petstore - OpenAPI 3.0"
body["paths"].should.have.length_of(15) assert len(body["paths"]) == 15
body["paths"]["/pet/{petId}"].should.have.length_of(3) assert len(body["paths"]["/pet/{petId}"]) == 3
set(body["paths"]["/pet/{petId}"].keys()).should.equal({"DELETE", "GET", "POST"}) assert set(body["paths"]["/pet/{petId}"].keys()) == {"DELETE", "GET", "POST"}
@mock_apigateway @mock_apigateway
@ -50,8 +50,8 @@ def test_export_api__unknown_api():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_export(restApiId="unknown", stageName="l", exportType="a") client.get_export(restApiId="unknown", stageName="l", exportType="a")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NotFoundException") assert err["Code"] == "NotFoundException"
err["Message"].should.equal("Invalid stage identifier specified") assert err["Message"] == "Invalid stage identifier specified"
@mock_apigateway @mock_apigateway
@ -77,5 +77,5 @@ def test_export_api__unknown_export_type():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_export(restApiId=api_id, stageName="dep", exportType="a") client.get_export(restApiId=api_id, stageName="dep", exportType="a")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("BadRequestException") assert err["Code"] == "BadRequestException"
err["Message"].should.equal("No API exporter for type 'a'") assert err["Message"] == "No API exporter for type 'a'"

View File

@ -12,8 +12,8 @@ def test_put_gateway_response_minimal():
resp = client.put_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX") resp = client.put_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX")
resp.should.have.key("responseType").equals("DEFAULT_4XX") assert resp["responseType"] == "DEFAULT_4XX"
resp.should.have.key("defaultResponse").equals(False) assert resp["defaultResponse"] is False
@mock_apigateway @mock_apigateway
@ -31,15 +31,15 @@ def test_put_gateway_response():
}, },
) )
resp.should.have.key("responseType").equals("DEFAULT_4XX") assert resp["responseType"] == "DEFAULT_4XX"
resp.should.have.key("defaultResponse").equals(False) assert resp["defaultResponse"] is False
resp.should.have.key("statusCode").equals("401") assert resp["statusCode"] == "401"
resp.should.have.key("responseParameters").equals( assert resp["responseParameters"] == {
{"gatewayresponse.header.Authorization": "'Basic'"} "gatewayresponse.header.Authorization": "'Basic'"
) }
resp.should.have.key("responseTemplates").equals( assert resp["responseTemplates"] == {
{"application/xml": "#set($inputRoot = $input.path('$'))\n{ }"} "application/xml": "#set($inputRoot = $input.path('$'))\n{ }"
) }
@mock_apigateway @mock_apigateway
@ -51,8 +51,8 @@ def test_get_gateway_response_minimal():
resp = client.get_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX") resp = client.get_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX")
resp.should.have.key("responseType").equals("DEFAULT_4XX") assert resp["responseType"] == "DEFAULT_4XX"
resp.should.have.key("defaultResponse").equals(False) assert resp["defaultResponse"] is False
@mock_apigateway @mock_apigateway
@ -72,15 +72,15 @@ def test_get_gateway_response():
resp = client.get_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX") resp = client.get_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX")
resp.should.have.key("responseType").equals("DEFAULT_4XX") assert resp["responseType"] == "DEFAULT_4XX"
resp.should.have.key("defaultResponse").equals(False) assert resp["defaultResponse"] is False
resp.should.have.key("statusCode").equals("401") assert resp["statusCode"] == "401"
resp.should.have.key("responseParameters").equals( assert resp["responseParameters"] == {
{"gatewayresponse.header.Authorization": "'Basic'"} "gatewayresponse.header.Authorization": "'Basic'"
) }
resp.should.have.key("responseTemplates").equals( assert resp["responseTemplates"] == {
{"application/xml": "#set($inputRoot = $input.path('$'))\n{ }"} "application/xml": "#set($inputRoot = $input.path('$'))\n{ }"
) }
@mock_apigateway @mock_apigateway
@ -91,7 +91,7 @@ def test_get_gateway_response_unknown():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX") client.get_gateway_response(restApiId=api_id, responseType="DEFAULT_4XX")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NotFoundException") assert err["Code"] == "NotFoundException"
@mock_apigateway @mock_apigateway
@ -100,7 +100,7 @@ def test_get_gateway_responses_empty():
api_id = client.create_rest_api(name="my_api", description="d")["id"] api_id = client.create_rest_api(name="my_api", description="d")["id"]
resp = client.get_gateway_responses(restApiId=api_id) resp = client.get_gateway_responses(restApiId=api_id)
resp.should.have.key("items").equals([]) assert resp["items"] == []
@mock_apigateway @mock_apigateway
@ -113,14 +113,14 @@ def test_get_gateway_responses():
) )
resp = client.get_gateway_responses(restApiId=api_id) resp = client.get_gateway_responses(restApiId=api_id)
resp.should.have.key("items").length_of(2) assert len(resp["items"]) == 2
resp["items"].should.contain( assert {"responseType": "DEFAULT_4XX", "defaultResponse": False} in resp["items"]
{"responseType": "DEFAULT_4XX", "defaultResponse": False} assert {
) "responseType": "DEFAULT_5XX",
resp["items"].should.contain( "defaultResponse": False,
{"responseType": "DEFAULT_5XX", "defaultResponse": False, "statusCode": "503"} "statusCode": "503",
) } in resp["items"]
@mock_apigateway @mock_apigateway
@ -133,12 +133,10 @@ def test_delete_gateway_response():
) )
resp = client.get_gateway_responses(restApiId=api_id) resp = client.get_gateway_responses(restApiId=api_id)
resp.should.have.key("items").length_of(2) assert len(resp["items"]) == 2
resp = client.delete_gateway_response(restApiId=api_id, responseType="DEFAULT_5XX") client.delete_gateway_response(restApiId=api_id, responseType="DEFAULT_5XX")
resp = client.get_gateway_responses(restApiId=api_id) resp = client.get_gateway_responses(restApiId=api_id)
resp.should.have.key("items").length_of(1) assert len(resp["items"]) == 1
resp["items"].should.contain( assert {"responseType": "DEFAULT_4XX", "defaultResponse": False} in resp["items"]
{"responseType": "DEFAULT_4XX", "defaultResponse": False}
)

View File

@ -16,17 +16,13 @@ def test_import_rest_api__api_is_created():
with open(path + "/resources/test_api.json", "rb") as api_json: with open(path + "/resources/test_api.json", "rb") as api_json:
response = client.import_rest_api(body=api_json.read()) response = client.import_rest_api(body=api_json.read())
response.should.have.key("id") assert "id" in response
response.should.have.key("name").which.should.equal("doc") assert response["name"] == "doc"
response.should.have.key("description").which.should.equal( assert response["description"] == "description from JSON file"
"description from JSON file"
)
response = client.get_rest_api(restApiId=response["id"]) response = client.get_rest_api(restApiId=response["id"])
response.should.have.key("name").which.should.equal("doc") assert response["name"] == "doc"
response.should.have.key("description").which.should.equal( assert response["description"] == "description from JSON file"
"description from JSON file"
)
@mock_apigateway @mock_apigateway
@ -38,14 +34,14 @@ def test_import_rest_api__nested_api():
response = client.import_rest_api(body=api_json.read()) response = client.import_rest_api(body=api_json.read())
resources = client.get_resources(restApiId=response["id"])["items"] resources = client.get_resources(restApiId=response["id"])["items"]
resources.should.have.length_of(5) assert len(resources) == 5
paths = [res["path"] for res in resources] paths = [res["path"] for res in resources]
paths.should.contain("/") assert "/" in paths
paths.should.contain("/test") assert "/test" in paths
paths.should.contain("/test/some") assert "/test/some" in paths
paths.should.contain("/test/some/deep") assert "/test/some/deep" in paths
paths.should.contain("/test/some/deep/path") assert "/test/some/deep/path" in paths
@mock_apigateway @mock_apigateway
@ -60,12 +56,13 @@ def test_import_rest_api__invalid_api_creates_nothing():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.import_rest_api(body=api_json.read(), failOnWarnings=True) client.import_rest_api(body=api_json.read(), failOnWarnings=True)
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("BadRequestException") assert err["Code"] == "BadRequestException"
err["Message"].should.equal( assert (
"Failed to parse the uploaded OpenAPI document due to: 'paths' is a required property" err["Message"]
== "Failed to parse the uploaded OpenAPI document due to: 'paths' is a required property"
) )
client.get_rest_apis().should.have.key("items").length_of(0) assert len(client.get_rest_apis()["items"]) == 0
@mock_apigateway @mock_apigateway
@ -82,7 +79,7 @@ def test_import_rest_api__methods_are_created():
# We have a GET-method # We have a GET-method
resp = client.get_method(restApiId=api_id, resourceId=root_id, httpMethod="GET") resp = client.get_method(restApiId=api_id, resourceId=root_id, httpMethod="GET")
resp["methodResponses"].should.equal({"200": {"statusCode": "200"}}) assert resp["methodResponses"] == {"200": {"statusCode": "200"}}
# We have a POST on /test # We have a POST on /test
test_path_id = [res for res in resources["items"] if res["path"] == "/test"][0][ test_path_id = [res for res in resources["items"] if res["path"] == "/test"][0][
@ -91,4 +88,4 @@ def test_import_rest_api__methods_are_created():
resp = client.get_method( resp = client.get_method(
restApiId=api_id, resourceId=test_path_id, httpMethod="POST" restApiId=api_id, resourceId=test_path_id, httpMethod="POST"
) )
resp["methodResponses"].should.equal({"201": {"statusCode": "201"}}) assert resp["methodResponses"] == {"201": {"statusCode": "201"}}

View File

@ -51,7 +51,7 @@ def test_http_integration():
f"https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}" f"https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}"
) )
requests.get(deploy_url).content.should.equal(b"a fake response") assert requests.get(deploy_url).content == b"a fake response"
@mock_apigateway @mock_apigateway
@ -75,8 +75,8 @@ def test_aws_integration_dynamodb():
f"https://{api_id}.execute-api.us-west-2.amazonaws.com/{stage_name}", f"https://{api_id}.execute-api.us-west-2.amazonaws.com/{stage_name}",
json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}}, json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}},
) )
res.status_code.should.equal(200) assert res.status_code == 200
res.content.should.equal(b"{}") assert res.content == b"{}"
@mock_apigateway @mock_apigateway
@ -100,20 +100,20 @@ def test_aws_integration_dynamodb_multiple_stages():
f"https://{api_id}.execute-api.us-west-2.amazonaws.com/dev", f"https://{api_id}.execute-api.us-west-2.amazonaws.com/dev",
json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}}, json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}},
) )
res.status_code.should.equal(200) assert res.status_code == 200
res = requests.put( res = requests.put(
f"https://{api_id}.execute-api.us-west-2.amazonaws.com/staging", f"https://{api_id}.execute-api.us-west-2.amazonaws.com/staging",
json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}}, json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}},
) )
res.status_code.should.equal(200) assert res.status_code == 200
# We haven't pushed to prod yet # We haven't pushed to prod yet
res = requests.put( res = requests.put(
f"https://{api_id}.execute-api.us-west-2.amazonaws.com/prod", f"https://{api_id}.execute-api.us-west-2.amazonaws.com/prod",
json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}}, json={"TableName": table_name, "Item": {"name": {"S": "the-key"}}},
) )
res.status_code.should.equal(400) assert res.status_code == 400
@mock_apigateway @mock_apigateway
@ -153,17 +153,17 @@ def test_aws_integration_dynamodb_multiple_resources():
"Item": {"name": {"S": "the-key"}, "attr2": {"S": "sth"}}, "Item": {"name": {"S": "the-key"}, "attr2": {"S": "sth"}},
}, },
) )
res.status_code.should.equal(200) assert res.status_code == 200
# Get item from child resource # Get item from child resource
res = requests.get( res = requests.get(
f"https://{api_id}.execute-api.us-west-2.amazonaws.com/dev/item", f"https://{api_id}.execute-api.us-west-2.amazonaws.com/dev/item",
json={"TableName": table_name, "Key": {"name": {"S": "the-key"}}}, json={"TableName": table_name, "Key": {"name": {"S": "the-key"}}},
) )
res.status_code.should.equal(200) assert res.status_code == 200
json.loads(res.content).should.equal( assert json.loads(res.content) == {
{"Item": {"name": {"S": "the-key"}, "attr2": {"S": "sth"}}} "Item": {"name": {"S": "the-key"}, "attr2": {"S": "sth"}}
) }
@mock_apigateway @mock_apigateway
@ -180,7 +180,7 @@ def test_aws_integration_sagemaker():
response = client.get_integration( response = client.get_integration(
restApiId=api_id, resourceId=resource_id, httpMethod="PUT" restApiId=api_id, resourceId=resource_id, httpMethod="PUT"
) )
response.should.have.key("uri").equals(integration_action) assert response["uri"] == integration_action
def create_table(dynamodb, table_name): def create_table(dynamodb, table_name):

View File

@ -24,9 +24,9 @@ def test_put_rest_api__api_details_are_persisted():
body=api_json.read(), body=api_json.read(),
) )
response.should.have.key("id").which.should.equal(api_id) assert response["id"] == api_id
response.should.have.key("name").which.should.equal("my_api") assert response["name"] == "my_api"
response.should.have.key("description").which.should.equal("this is my api") assert response["description"] == "this is my api"
@mock_apigateway @mock_apigateway
@ -45,7 +45,7 @@ def test_put_rest_api__methods_are_created():
# We have a GET-method # We have a GET-method
resp = client.get_method(restApiId=api_id, resourceId=root_id, httpMethod="GET") resp = client.get_method(restApiId=api_id, resourceId=root_id, httpMethod="GET")
resp["methodResponses"].should.equal({"200": {"statusCode": "200"}}) assert resp["methodResponses"] == {"200": {"statusCode": "200"}}
# We have a POST on /test # We have a POST on /test
test_path_id = [res for res in resources["items"] if res["path"] == "/test"][0][ test_path_id = [res for res in resources["items"] if res["path"] == "/test"][0][
@ -54,7 +54,7 @@ def test_put_rest_api__methods_are_created():
resp = client.get_method( resp = client.get_method(
restApiId=api_id, resourceId=test_path_id, httpMethod="POST" restApiId=api_id, resourceId=test_path_id, httpMethod="POST"
) )
resp["methodResponses"].should.equal({"201": {"statusCode": "201"}}) assert resp["methodResponses"] == {"201": {"statusCode": "201"}}
@mock_apigateway @mock_apigateway
@ -79,7 +79,7 @@ def test_put_rest_api__existing_methods_are_overwritten():
response = client.get_method( response = client.get_method(
restApiId=api_id, resourceId=root_id, httpMethod="POST" restApiId=api_id, resourceId=root_id, httpMethod="POST"
) )
response.should.have.key("httpMethod").equals("POST") assert response["httpMethod"] == "POST"
path = os.path.dirname(os.path.abspath(__file__)) path = os.path.dirname(os.path.abspath(__file__))
with open(path + "/resources/test_api.json", "rb") as api_json: with open(path + "/resources/test_api.json", "rb") as api_json:
@ -96,13 +96,13 @@ def test_put_rest_api__existing_methods_are_overwritten():
resource for resource in resources["items"] if resource["path"] == "/" resource for resource in resources["items"] if resource["path"] == "/"
][0]["id"] ][0]["id"]
new_root_id.shouldnt.equal(root_id) assert new_root_id != root_id
# Our POST-method should be gone # Our POST-method should be gone
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_method(restApiId=api_id, resourceId=new_root_id, httpMethod="POST") client.get_method(restApiId=api_id, resourceId=new_root_id, httpMethod="POST")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NotFoundException") assert err["Code"] == "NotFoundException"
# We just have a GET-method, as defined in the JSON # We just have a GET-method, as defined in the JSON
client.get_method(restApiId=api_id, resourceId=new_root_id, httpMethod="GET") client.get_method(restApiId=api_id, resourceId=new_root_id, httpMethod="GET")
@ -139,7 +139,7 @@ def test_put_rest_api__existing_methods_still_exist():
response = client.get_method( response = client.get_method(
restApiId=api_id, resourceId=root_id, httpMethod="POST" restApiId=api_id, resourceId=root_id, httpMethod="POST"
) )
response.should.have.key("httpMethod").equals("POST") assert response["httpMethod"] == "POST"
@mock_apigateway @mock_apigateway
@ -159,9 +159,10 @@ def test_put_rest_api__fail_on_invalid_spec():
restApiId=api_id, failOnWarnings=True, body=api_json.read() restApiId=api_id, failOnWarnings=True, body=api_json.read()
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("BadRequestException") assert err["Code"] == "BadRequestException"
err["Message"].should.equal( assert (
"Failed to parse the uploaded OpenAPI document due to: 'paths' is a required property" err["Message"]
== "Failed to parse the uploaded OpenAPI document due to: 'paths' is a required property"
) )
@ -179,8 +180,8 @@ def test_put_rest_api__fail_on_invalid_version():
restApiId=api_id, failOnWarnings=True, body=api_json.read() restApiId=api_id, failOnWarnings=True, body=api_json.read()
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("BadRequestException") assert err["Code"] == "BadRequestException"
err["Message"].should.equal("Only OpenAPI 3.x.x are currently supported") assert err["Message"] == "Only OpenAPI 3.x.x are currently supported"
@mock_apigateway @mock_apigateway
@ -195,9 +196,10 @@ def test_put_rest_api__fail_on_invalid_mode():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.put_rest_api(restApiId=api_id, mode="unknown", body=api_json.read()) client.put_rest_api(restApiId=api_id, mode="unknown", body=api_json.read())
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("BadRequestException") assert err["Code"] == "BadRequestException"
err["Message"].should.equal( assert (
'Enumeration value of OpenAPI import mode must be "overwrite" or "merge"' err["Message"]
== 'Enumeration value of OpenAPI import mode must be "overwrite" or "merge"'
) )
@ -217,6 +219,6 @@ def test_put_rest_api__as_yaml():
body=api_yaml.read(), body=api_yaml.read(),
) )
response.should.have.key("id").which.should.equal(api_id) assert response["id"] == api_id
response.should.have.key("name").which.should.equal("my_api") assert response["name"] == "my_api"
response.should.have.key("description").which.should.equal("this is my api") assert response["description"] == "this is my api"

View File

@ -1,6 +1,5 @@
import boto3 import boto3
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
from moto import mock_apigateway from moto import mock_apigateway
@ -24,20 +23,18 @@ def test_create_stage_minimal():
restApiId=api_id, stageName=new_stage_name, deploymentId=deployment_id restApiId=api_id, stageName=new_stage_name, deploymentId=deployment_id
) )
response.should.have.key("stageName").equals(new_stage_name) assert response["stageName"] == new_stage_name
response.should.have.key("deploymentId").equals(deployment_id) assert response["deploymentId"] == deployment_id
response.should.have.key("methodSettings").equals({}) assert response["methodSettings"] == {}
response.should.have.key("variables").equals({}) assert response["variables"] == {}
response.should.have.key("ResponseMetadata").should.have.key( assert response["ResponseMetadata"]["HTTPStatusCode"] == 201
"HTTPStatusCode" assert response["description"] == ""
).equals(201) assert "cacheClusterStatus" not in response
response.should.have.key("description").equals("") assert response["cacheClusterEnabled"] is False
response.shouldnt.have.key("cacheClusterStatus")
response.should.have.key("cacheClusterEnabled").equals(False)
stage = client.get_stage(restApiId=api_id, stageName=new_stage_name) stage = client.get_stage(restApiId=api_id, stageName=new_stage_name)
stage["stageName"].should.equal(new_stage_name) assert stage["stageName"] == new_stage_name
stage["deploymentId"].should.equal(deployment_id) assert stage["deploymentId"] == deployment_id
@mock_apigateway @mock_apigateway
@ -59,21 +56,19 @@ def test_create_stage_with_env_vars():
variables={"env": "dev"}, variables={"env": "dev"},
) )
response.should.have.key("stageName").equals(new_stage_name_with_vars) assert response["stageName"] == new_stage_name_with_vars
response.should.have.key("deploymentId").equals(deployment_id) assert response["deploymentId"] == deployment_id
response.should.have.key("methodSettings").equals({}) assert response["methodSettings"] == {}
response.should.have.key("variables").equals({"env": "dev"}) assert response["variables"] == {"env": "dev"}
response.should.have.key("ResponseMetadata").should.have.key( assert response["ResponseMetadata"]["HTTPStatusCode"] == 201
"HTTPStatusCode" assert response["description"] == ""
).equals(201) assert "cacheClusterStatus" not in response
response.should.have.key("description").equals("") assert response["cacheClusterEnabled"] is False
response.shouldnt.have.key("cacheClusterStatus")
response.should.have.key("cacheClusterEnabled").equals(False)
stage = client.get_stage(restApiId=api_id, stageName=new_stage_name_with_vars) stage = client.get_stage(restApiId=api_id, stageName=new_stage_name_with_vars)
stage["stageName"].should.equal(new_stage_name_with_vars) assert stage["stageName"] == new_stage_name_with_vars
stage["deploymentId"].should.equal(deployment_id) assert stage["deploymentId"] == deployment_id
stage["variables"].should.have.key("env").which.should.match("dev") assert stage["variables"]["env"] == "dev"
@mock_apigateway @mock_apigateway
@ -97,21 +92,19 @@ def test_create_stage_with_vars_and_cache():
description="hello moto", description="hello moto",
) )
response.should.have.key("stageName").equals(new_stage_name) assert response["stageName"] == new_stage_name
response.should.have.key("deploymentId").equals(deployment_id) assert response["deploymentId"] == deployment_id
response.should.have.key("methodSettings").equals({}) assert response["methodSettings"] == {}
response.should.have.key("variables").equals({"env": "dev"}) assert response["variables"] == {"env": "dev"}
response.should.have.key("ResponseMetadata").should.have.key( assert response["ResponseMetadata"]["HTTPStatusCode"] == 201
"HTTPStatusCode" assert response["description"] == "hello moto"
).equals(201) assert response["cacheClusterStatus"] == "AVAILABLE"
response.should.have.key("description").equals("hello moto") assert response["cacheClusterEnabled"] is True
response.should.have.key("cacheClusterStatus").equals("AVAILABLE") assert response["cacheClusterSize"] == "0.5"
response.should.have.key("cacheClusterEnabled").equals(True)
response.should.have.key("cacheClusterSize").equals("0.5")
stage = client.get_stage(restApiId=api_id, stageName=new_stage_name) stage = client.get_stage(restApiId=api_id, stageName=new_stage_name)
stage["cacheClusterSize"].should.equal("0.5") assert stage["cacheClusterSize"] == "0.5"
@mock_apigateway @mock_apigateway
@ -137,24 +130,22 @@ def test_create_stage_with_cache_settings():
description="hello moto", description="hello moto",
) )
response.should.have.key("stageName").equals(new_stage_name) assert response["stageName"] == new_stage_name
response.should.have.key("deploymentId").equals(deployment_id) assert response["deploymentId"] == deployment_id
response.should.have.key("methodSettings").equals({}) assert response["methodSettings"] == {}
response.should.have.key("variables").equals({"env": "dev"}) assert response["variables"] == {"env": "dev"}
response.should.have.key("ResponseMetadata").should.have.key( assert response["ResponseMetadata"]["HTTPStatusCode"] == 201
"HTTPStatusCode" assert response["description"] == "hello moto"
).equals(201) assert response["cacheClusterStatus"] == "AVAILABLE"
response.should.have.key("description").equals("hello moto") assert response["cacheClusterEnabled"] is True
response.should.have.key("cacheClusterStatus").equals("AVAILABLE") assert response["cacheClusterSize"] == "1.6"
response.should.have.key("cacheClusterEnabled").equals(True) assert response["tracingEnabled"] is True
response.should.have.key("cacheClusterSize").equals("1.6")
response.should.have.key("tracingEnabled").equals(True)
stage = client.get_stage(restApiId=api_id, stageName=new_stage_name) stage = client.get_stage(restApiId=api_id, stageName=new_stage_name)
stage["stageName"].should.equal(new_stage_name) assert stage["stageName"] == new_stage_name
stage["deploymentId"].should.equal(deployment_id) assert stage["deploymentId"] == deployment_id
stage["variables"].should.have.key("env").which.should.match("dev") assert stage["variables"]["env"] == "dev"
stage["cacheClusterSize"].should.equal("1.6") assert stage["cacheClusterSize"] == "1.6"
@mock_apigateway @mock_apigateway
@ -171,8 +162,8 @@ def test_recreate_stage_from_deployment():
restApiId=api_id, stageName=stage_name, deploymentId=depl_id1 restApiId=api_id, stageName=stage_name, deploymentId=depl_id1
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ConflictException") assert err["Code"] == "ConflictException"
err["Message"].should.equal("Stage already exists") assert err["Message"] == "Stage already exists"
@mock_apigateway @mock_apigateway
@ -194,8 +185,8 @@ def test_create_stage_twice():
restApiId=api_id, stageName=new_stage_name, deploymentId=depl_id1 restApiId=api_id, stageName=new_stage_name, deploymentId=depl_id1
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ConflictException") assert err["Code"] == "ConflictException"
err["Message"].should.equal("Stage already exists") assert err["Message"] == "Stage already exists"
@mock_apigateway @mock_apigateway
@ -222,20 +213,20 @@ def test_delete_stage():
) )
stages = client.get_stages(restApiId=api_id)["item"] stages = client.get_stages(restApiId=api_id)["item"]
stage_names = [stage["stageName"] for stage in stages] stage_names = [stage["stageName"] for stage in stages]
stage_names.should.have.length_of(3) assert len(stage_names) == 3
stage_names.should.contain(stage_name) assert stage_name in stage_names
stage_names.should.contain(new_stage_name) assert new_stage_name in stage_names
stage_names.should.contain(new_stage_name_with_vars) assert new_stage_name_with_vars in stage_names
# delete stage # delete stage
response = client.delete_stage(restApiId=api_id, stageName=new_stage_name_with_vars) response = client.delete_stage(restApiId=api_id, stageName=new_stage_name_with_vars)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(202) assert response["ResponseMetadata"]["HTTPStatusCode"] == 202
# verify other stage still exists # verify other stage still exists
stages = client.get_stages(restApiId=api_id)["item"] stages = client.get_stages(restApiId=api_id)["item"]
stage_names = [stage["stageName"] for stage in stages] stage_names = [stage["stageName"] for stage in stages]
stage_names.should.have.length_of(2) assert len(stage_names) == 2
stage_names.shouldnt.contain(new_stage_name_with_vars) assert new_stage_name_with_vars not in stage_names
@mock_apigateway @mock_apigateway
@ -249,26 +240,26 @@ def test_delete_stage_created_by_deployment():
# Sanity check that the deployment exists # Sanity check that the deployment exists
depls = client.get_deployments(restApiId=api_id)["items"] depls = client.get_deployments(restApiId=api_id)["items"]
depls.should.have.length_of(1) assert len(depls) == 1
set(depls[0].keys()).should.equal({"id", "createdDate"}) assert set(depls[0].keys()) == {"id", "createdDate"}
# Sanity check that the stage exists # Sanity check that the stage exists
stage = client.get_stages(restApiId=api_id)["item"][0] stage = client.get_stages(restApiId=api_id)["item"][0]
stage.should.have.key("deploymentId").equals(depl_id1) assert stage["deploymentId"] == depl_id1
stage.should.have.key("stageName").equals(stage_name) assert stage["stageName"] == stage_name
# delete stage # delete stage
response = client.delete_stage(restApiId=api_id, stageName=stage_name) response = client.delete_stage(restApiId=api_id, stageName=stage_name)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(202) assert response["ResponseMetadata"]["HTTPStatusCode"] == 202
# verify no stage exists # verify no stage exists
stages = client.get_stages(restApiId=api_id) stages = client.get_stages(restApiId=api_id)
stages.should.have.key("item").equals([]) assert stages["item"] == []
# verify deployment still exists, unchanged # verify deployment still exists, unchanged
depls = client.get_deployments(restApiId=api_id)["items"] depls = client.get_deployments(restApiId=api_id)["items"]
depls.should.have.length_of(1) assert len(depls) == 1
set(depls[0].keys()).should.equal({"id", "createdDate"}) assert set(depls[0].keys()) == {"id", "createdDate"}
@mock_apigateway @mock_apigateway
@ -279,8 +270,8 @@ def test_delete_stage_unknown_stage():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.delete_stage(restApiId=api_id, stageName="unknown") client.delete_stage(restApiId=api_id, stageName="unknown")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Message"].should.equal("Invalid stage identifier specified") assert err["Message"] == "Invalid stage identifier specified"
err["Code"].should.equal("NotFoundException") assert err["Code"] == "NotFoundException"
@mock_apigateway @mock_apigateway
@ -298,11 +289,9 @@ def test_update_stage_configuration():
response = client.get_deployment(restApiId=api_id, deploymentId=deployment_id) response = client.get_deployment(restApiId=api_id, deploymentId=deployment_id)
response.should.have.key("id").equals(deployment_id) assert response["id"] == deployment_id
response.should.have.key("ResponseMetadata").should.have.key( assert response["ResponseMetadata"]["HTTPStatusCode"] == 200
"HTTPStatusCode" assert response["description"] == "1.0.1"
).equals(200)
response.should.have.key("description").equals("1.0.1")
response = client.create_deployment( response = client.create_deployment(
restApiId=api_id, stageName=stage_name, description="1.0.2" restApiId=api_id, stageName=stage_name, description="1.0.2"
@ -310,10 +299,10 @@ def test_update_stage_configuration():
deployment_id2 = response["id"] deployment_id2 = response["id"]
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage["stageName"].should.equal(stage_name) assert stage["stageName"] == stage_name
stage["deploymentId"].should.equal(deployment_id2) assert stage["deploymentId"] == deployment_id2
stage.shouldnt.have.key("cacheClusterSize") assert "cacheClusterSize" not in stage
stage.shouldnt.have.key("cacheClusterStatus") assert "cacheClusterStatus" not in stage
client.update_stage( client.update_stage(
restApiId=api_id, restApiId=api_id,
@ -325,8 +314,8 @@ def test_update_stage_configuration():
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage.should.have.key("cacheClusterSize").which.should.equal("0.5") assert stage["cacheClusterSize"] == "0.5"
stage.should.have.key("cacheClusterStatus").equals("AVAILABLE") assert stage["cacheClusterStatus"] == "AVAILABLE"
client.update_stage( client.update_stage(
restApiId=api_id, restApiId=api_id,
@ -338,7 +327,7 @@ def test_update_stage_configuration():
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage.should.have.key("cacheClusterSize").which.should.equal("1.6") assert stage["cacheClusterSize"] == "1.6"
client.update_stage( client.update_stage(
restApiId=api_id, restApiId=api_id,
@ -368,16 +357,14 @@ def test_update_stage_configuration():
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage["description"].should.match("stage description update") assert stage["description"] == "stage description update"
stage["cacheClusterSize"].should.equal("1.6") assert stage["cacheClusterSize"] == "1.6"
stage["variables"]["environment"].should.match("dev") assert stage["variables"]["environment"] == "dev"
stage["variables"].should_not.have.key("region") assert "region" not in stage["variables"]
stage["cacheClusterEnabled"].should.equal(True) assert stage["cacheClusterEnabled"] is True
stage["deploymentId"].should.match(deployment_id) assert stage["deploymentId"] == deployment_id
stage["methodSettings"].should.have.key("*/*") assert "*/*" in stage["methodSettings"]
stage["methodSettings"]["*/*"].should.have.key( assert stage["methodSettings"]["*/*"]["cacheDataEncrypted"] is True
"cacheDataEncrypted"
).which.should.be.true
@mock_apigateway @mock_apigateway
@ -410,12 +397,10 @@ def test_update_stage_add_access_log_settings():
) )
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage.should.have.key("accessLogSettings").equals( assert stage["accessLogSettings"] == {
{ "format": "$context.identity.sourceIp msg",
"format": "$context.identity.sourceIp msg", "destinationArn": "arn:aws:logs:us-east-1:123456789012:log-group:foo-bar-x0hyv",
"destinationArn": "arn:aws:logs:us-east-1:123456789012:log-group:foo-bar-x0hyv", }
}
)
@mock_apigateway @mock_apigateway
@ -436,7 +421,7 @@ def test_update_stage_tracing_disabled():
) )
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage.should.have.key("tracingEnabled").equals(False) assert stage["tracingEnabled"] is False
client.update_stage( client.update_stage(
restApiId=api_id, restApiId=api_id,
@ -445,7 +430,7 @@ def test_update_stage_tracing_disabled():
) )
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage.should.have.key("tracingEnabled").equals(True) assert stage["tracingEnabled"] is True
@mock_apigateway @mock_apigateway
@ -467,7 +452,7 @@ def test_update_stage_remove_access_log_settings():
) )
stage = client.get_stage(restApiId=api_id, stageName=stage_name) stage = client.get_stage(restApiId=api_id, stageName=stage_name)
stage.shouldnt.have.key("accessLogSettings") assert "accessLogSettings" not in stage
@mock_apigateway @mock_apigateway
@ -491,9 +476,10 @@ def test_update_stage_configuration_unknown_operation():
], ],
) )
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("ValidationException") assert err["Code"] == "ValidationException"
err["Message"].should.equal( assert (
"Member must satisfy enum value set: [add, remove, move, test, replace, copy]" err["Message"]
== "Member must satisfy enum value set: [add, remove, move, test, replace, copy]"
) )
@ -506,4 +492,4 @@ def test_non_existent_stage():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_stage(restApiId=api_id, stageName="xxx") client.get_stage(restApiId=api_id, stageName="xxx")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NotFoundException") assert err["Code"] == "NotFoundException"

View File

@ -1,5 +1,4 @@
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_apigateway from moto import mock_apigateway
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
import pytest import pytest
@ -19,27 +18,24 @@ def test_create_request_validator():
response = create_validator(client, api_id) response = create_validator(client, api_id)
response.pop(RESPONSE_METADATA) response.pop(RESPONSE_METADATA)
response.pop(ID) response.pop(ID)
response.should.equal( assert response == {
{ NAME: PARAM_NAME,
NAME: PARAM_NAME, VALIDATE_REQUEST_BODY: True,
VALIDATE_REQUEST_BODY: True, VALIDATE_REQUEST_PARAMETERS: True,
VALIDATE_REQUEST_PARAMETERS: True, }
}
)
@mock_apigateway @mock_apigateway
def test_get_request_validators(): def test_get_request_validators():
client = create_client() client = create_client()
api_id = create_rest_api_id(client) api_id = create_rest_api_id(client)
response = client.get_request_validators(restApiId=api_id) response = client.get_request_validators(restApiId=api_id)
validators = response["items"] validators = response["items"]
validators.should.have.length_of(0) assert len(validators) == 0
response.pop(RESPONSE_METADATA) response.pop(RESPONSE_METADATA)
response.should.equal({"items": []}) assert response == {"items": []}
response = create_validator(client, api_id) response = create_validator(client, api_id)
validator_id1 = response[ID] validator_id1 = response[ID]
@ -48,27 +44,25 @@ def test_get_request_validators():
response = client.get_request_validators(restApiId=api_id) response = client.get_request_validators(restApiId=api_id)
validators = response["items"] validators = response["items"]
validators.should.have.length_of(2) assert len(validators) == 2
response.pop(RESPONSE_METADATA) response.pop(RESPONSE_METADATA)
response.should.equal( assert response == {
{ "items": [
"items": [ {
{ ID: validator_id1,
ID: validator_id1, NAME: PARAM_NAME,
NAME: PARAM_NAME, VALIDATE_REQUEST_BODY: True,
VALIDATE_REQUEST_BODY: True, VALIDATE_REQUEST_PARAMETERS: True,
VALIDATE_REQUEST_PARAMETERS: True, },
}, {
{ ID: validator_id2,
ID: validator_id2, NAME: PARAM_NAME,
NAME: PARAM_NAME, VALIDATE_REQUEST_BODY: True,
VALIDATE_REQUEST_BODY: True, VALIDATE_REQUEST_PARAMETERS: True,
VALIDATE_REQUEST_PARAMETERS: True, },
}, ]
] }
}
)
@mock_apigateway @mock_apigateway
@ -81,14 +75,12 @@ def test_get_request_validator():
restApiId=api_id, requestValidatorId=validator_id restApiId=api_id, requestValidatorId=validator_id
) )
response.pop(RESPONSE_METADATA) response.pop(RESPONSE_METADATA)
response.should.equal( assert response == {
{ ID: validator_id,
ID: validator_id, NAME: PARAM_NAME,
NAME: PARAM_NAME, VALIDATE_REQUEST_BODY: True,
VALIDATE_REQUEST_BODY: True, VALIDATE_REQUEST_PARAMETERS: True,
VALIDATE_REQUEST_PARAMETERS: True, }
}
)
@mock_apigateway @mock_apigateway
@ -103,14 +95,12 @@ def test_delete_request_validator():
) )
response.pop(RESPONSE_METADATA) response.pop(RESPONSE_METADATA)
response.should.equal( assert response == {
{ ID: validator_id,
ID: validator_id, NAME: PARAM_NAME,
NAME: PARAM_NAME, VALIDATE_REQUEST_BODY: True,
VALIDATE_REQUEST_BODY: True, VALIDATE_REQUEST_PARAMETERS: True,
VALIDATE_REQUEST_PARAMETERS: True, }
}
)
# delete validator # delete validator
response = client.delete_request_validator( response = client.delete_request_validator(
@ -119,8 +109,8 @@ def test_delete_request_validator():
with pytest.raises(ClientError) as ex: with pytest.raises(ClientError) as ex:
client.get_request_validator(restApiId=api_id, requestValidatorId=validator_id) client.get_request_validator(restApiId=api_id, requestValidatorId=validator_id)
err = ex.value.response["Error"] err = ex.value.response["Error"]
err["Code"].should.equal("BadRequestException") assert err["Code"] == "BadRequestException"
err["Message"].should.equal("Invalid Request Validator Id specified") assert err["Message"] == "Invalid Request Validator Id specified"
@mock_apigateway @mock_apigateway
@ -140,14 +130,12 @@ def test_update_request_validator():
], ],
) )
response.pop(RESPONSE_METADATA) response.pop(RESPONSE_METADATA)
response.should.equal( assert response == {
{ ID: validator_id,
ID: validator_id, NAME: PARAM_NAME + PARAM_NAME,
NAME: PARAM_NAME + PARAM_NAME, VALIDATE_REQUEST_BODY: False,
VALIDATE_REQUEST_BODY: False, VALIDATE_REQUEST_PARAMETERS: False,
VALIDATE_REQUEST_PARAMETERS: False, }
}
)
def create_validator(client, api_id): def create_validator(client, api_id):

View File

@ -10,7 +10,7 @@ def test_get_vpc_links_empty():
client = boto3.client("apigateway", region_name="eu-west-1") client = boto3.client("apigateway", region_name="eu-west-1")
resp = client.get_vpc_links() resp = client.get_vpc_links()
resp.should.have.key("items").equals([]) assert resp["items"] == []
@mock_apigateway @mock_apigateway
@ -24,12 +24,12 @@ def test_create_vpc_link():
tags={"key1": "value1"}, tags={"key1": "value1"},
) )
resp.should.have.key("id") assert "id" in resp
resp.should.have.key("name").equals("vpcl") assert resp["name"] == "vpcl"
resp.should.have.key("description").equals("my first vpc link") assert resp["description"] == "my first vpc link"
resp.should.have.key("targetArns").equals(["elb:target:arn"]) assert resp["targetArns"] == ["elb:target:arn"]
resp.should.have.key("status").equals("AVAILABLE") assert resp["status"] == "AVAILABLE"
resp.should.have.key("tags").equals({"key1": "value1"}) assert resp["tags"] == {"key1": "value1"}
@mock_apigateway @mock_apigateway
@ -45,12 +45,12 @@ def test_get_vpc_link():
resp = client.get_vpc_link(vpcLinkId=vpc_link_id) resp = client.get_vpc_link(vpcLinkId=vpc_link_id)
resp.should.have.key("id") assert "id" in resp
resp.should.have.key("name").equals("vpcl") assert resp["name"] == "vpcl"
resp.should.have.key("description").equals("my first vpc link") assert resp["description"] == "my first vpc link"
resp.should.have.key("targetArns").equals(["elb:target:arn"]) assert resp["targetArns"] == ["elb:target:arn"]
resp.should.have.key("status").equals("AVAILABLE") assert resp["status"] == "AVAILABLE"
resp.should.have.key("tags").equals({"key1": "value1"}) assert resp["tags"] == {"key1": "value1"}
@mock_apigateway @mock_apigateway
@ -60,8 +60,8 @@ def test_get_vpc_link_unknown():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_vpc_link(vpcLinkId="unknown") client.get_vpc_link(vpcLinkId="unknown")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NotFoundException") assert err["Code"] == "NotFoundException"
err["Message"].should.equal("VPCLink not found") assert err["Message"] == "VPCLink not found"
@mock_apigateway @mock_apigateway
@ -76,8 +76,8 @@ def test_get_vpc_links():
)["id"] )["id"]
links = client.get_vpc_links()["items"] links = client.get_vpc_links()["items"]
links.should.have.length_of(1) assert len(links) == 1
links[0]["id"].should.equal(vpc_link_id) assert links[0]["id"] == vpc_link_id
client.create_vpc_link( client.create_vpc_link(
name="vpcl2", name="vpcl2",
@ -87,7 +87,7 @@ def test_get_vpc_links():
) )
links = client.get_vpc_links()["items"] links = client.get_vpc_links()["items"]
links.should.have.length_of(2) assert len(links) == 2
@mock_apigateway @mock_apigateway
@ -102,9 +102,9 @@ def test_delete_vpc_link():
)["id"] )["id"]
links = client.get_vpc_links()["items"] links = client.get_vpc_links()["items"]
links.should.have.length_of(1) assert len(links) == 1
client.delete_vpc_link(vpcLinkId=vpc_link_id) client.delete_vpc_link(vpcLinkId=vpc_link_id)
links = client.get_vpc_links()["items"] links = client.get_vpc_links()["items"]
links.should.have.length_of(0) assert len(links) == 0

View File

@ -1,4 +1,3 @@
import sure # noqa # pylint: disable=unused-import
import json import json
import moto.server as server import moto.server as server
@ -13,7 +12,7 @@ def test_list_apis():
test_client = backend.test_client() test_client = backend.test_client()
res = test_client.get("/restapis") res = test_client.get("/restapis")
json.loads(res.data).should.contain("item") assert "item" in json.loads(res.data)
def test_usage_plans_apis(): def test_usage_plans_apis():
@ -22,33 +21,33 @@ def test_usage_plans_apis():
# List usage plans (expect empty) # List usage plans (expect empty)
res = test_client.get("/usageplans") res = test_client.get("/usageplans")
json.loads(res.data)["item"].should.have.length_of(0) assert len(json.loads(res.data)["item"]) == 0
# Create usage plan # Create usage plan
res = test_client.post("/usageplans", data=json.dumps({"name": "test"})) res = test_client.post("/usageplans", data=json.dumps({"name": "test"}))
created_plan = json.loads(res.data) created_plan = json.loads(res.data)
created_plan["name"].should.equal("test") assert created_plan["name"] == "test"
# List usage plans (expect 1 plan) # List usage plans (expect 1 plan)
res = test_client.get("/usageplans") res = test_client.get("/usageplans")
json.loads(res.data)["item"].should.have.length_of(1) assert len(json.loads(res.data)["item"]) == 1
# Get single usage plan # Get single usage plan
res = test_client.get(f"/usageplans/{created_plan['id']}") res = test_client.get(f"/usageplans/{created_plan['id']}")
fetched_plan = json.loads(res.data) fetched_plan = json.loads(res.data)
fetched_plan.should.equal(created_plan) assert fetched_plan == created_plan
# Not existing usage plan # Not existing usage plan
res = test_client.get("/usageplans/not_existing") res = test_client.get("/usageplans/not_existing")
res.status_code.should.equal(404) assert res.status_code == 404
# Delete usage plan # Delete usage plan
res = test_client.delete(f"/usageplans/{created_plan['id']}") res = test_client.delete(f"/usageplans/{created_plan['id']}")
res.data.should.equal(b"{}") assert res.data == b"{}"
# List usage plans (expect empty again) # List usage plans (expect empty again)
res = test_client.get("/usageplans") res = test_client.get("/usageplans")
json.loads(res.data)["item"].should.have.length_of(0) assert len(json.loads(res.data)["item"]) == 0
def test_usage_plans_keys(): def test_usage_plans_keys():
@ -62,19 +61,19 @@ def test_usage_plans_keys():
# List usage plans keys (expect empty) # List usage plans keys (expect empty)
res = test_client.get(f"/usageplans/{usage_plan_id}/keys") res = test_client.get(f"/usageplans/{usage_plan_id}/keys")
json.loads(res.data)["item"].should.have.length_of(0) assert len(json.loads(res.data)["item"]) == 0
# Invalid api key (does not exists at all) # Invalid api key (does not exists at all)
res = test_client.get(f"/usageplans/{usage_plan_id}/keys/not_existing") res = test_client.get(f"/usageplans/{usage_plan_id}/keys/not_existing")
res.status_code.should.equal(404) assert res.status_code == 404
# not existing usage plan with existing api key # not existing usage plan with existing api key
res = test_client.get(f"/usageplans/not_existing/keys/{created_api_key['id']}") res = test_client.get(f"/usageplans/not_existing/keys/{created_api_key['id']}")
res.status_code.should.equal(404) assert res.status_code == 404
# not jet added api key # not jet added api key
res = test_client.get(f"/usageplans/{usage_plan_id}/keys/{created_api_key['id']}") res = test_client.get(f"/usageplans/{usage_plan_id}/keys/{created_api_key['id']}")
res.status_code.should.equal(404) assert res.status_code == 404
# Create usage plan key # Create usage plan key
res = test_client.post( res = test_client.post(
@ -85,22 +84,22 @@ def test_usage_plans_keys():
# List usage plans keys (expect 1 key) # List usage plans keys (expect 1 key)
res = test_client.get(f"/usageplans/{usage_plan_id}/keys") res = test_client.get(f"/usageplans/{usage_plan_id}/keys")
json.loads(res.data)["item"].should.have.length_of(1) assert len(json.loads(res.data)["item"]) == 1
# Get single usage plan key # Get single usage plan key
res = test_client.get(f"/usageplans/{usage_plan_id}/keys/{created_api_key['id']}") res = test_client.get(f"/usageplans/{usage_plan_id}/keys/{created_api_key['id']}")
fetched_plan_key = json.loads(res.data) fetched_plan_key = json.loads(res.data)
fetched_plan_key.should.equal(created_usage_plan_key) assert fetched_plan_key == created_usage_plan_key
# Delete usage plan key # Delete usage plan key
res = test_client.delete( res = test_client.delete(
f"/usageplans/{usage_plan_id}/keys/{created_api_key['id']}" f"/usageplans/{usage_plan_id}/keys/{created_api_key['id']}"
) )
res.data.should.equal(b"{}") assert res.data == b"{}"
# List usage plans keys (expect to be empty again) # List usage plans keys (expect to be empty again)
res = test_client.get(f"/usageplans/{usage_plan_id}/keys") res = test_client.get(f"/usageplans/{usage_plan_id}/keys")
json.loads(res.data)["item"].should.have.length_of(0) assert len(json.loads(res.data)["item"]) == 0
def test_create_usage_plans_key_non_existent_api_key(): def test_create_usage_plans_key_non_existent_api_key():
@ -113,7 +112,7 @@ def test_create_usage_plans_key_non_existent_api_key():
f"/usageplans/{usage_plan_id}/keys", f"/usageplans/{usage_plan_id}/keys",
data=json.dumps({"keyId": "non-existent", "keyType": "API_KEY"}), data=json.dumps({"keyId": "non-existent", "keyType": "API_KEY"}),
) )
res.status_code.should.equal(404) assert res.status_code == 404
def test_put_integration_response_without_body(): def test_put_integration_response_without_body():
@ -128,8 +127,8 @@ def test_put_integration_response_without_body():
# client.put_integration_response( # client.put_integration_response(
# restApiId=api_id, resourceId=root_id, httpMethod="GET", statusCode="200" # restApiId=api_id, resourceId=root_id, httpMethod="GET", statusCode="200"
# ) # )
# ex.value.response["Error"]["Code"].should.equal("BadRequestException") # assert ex.value.response["Error"]["Code"] == "BadRequestException"
# ex.value.response["Error"]["Message"].should.equal("Invalid request input") # assert ex.value.response["Error"]["Message"] == "Invalid request input"
# #
# As a workaround, we can create a PUT-request without body, which will force the error # As a workaround, we can create a PUT-request without body, which will force the error
# Related: # https://github.com/aws/aws-sdk-js/issues/2588 # Related: # https://github.com/aws/aws-sdk-js/issues/2588
@ -140,7 +139,8 @@ def test_put_integration_response_without_body():
res = test_client.put( res = test_client.put(
"/restapis/f_id/resources/r_id/methods/m_id/integration/responses/200/" "/restapis/f_id/resources/r_id/methods/m_id/integration/responses/200/"
) )
res.status_code.should.equal(400) assert res.status_code == 400
json.loads(res.data).should.equal( assert json.loads(res.data) == {
{"__type": "BadRequestException", "message": "Invalid request input"} "__type": "BadRequestException",
) "message": "Invalid request input",
}