Improve test coverage (#4098)

This commit is contained in:
Bert Blommers 2021-07-28 11:17:15 +01:00 committed by GitHub
parent d248191cb4
commit a1905ad584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 782 additions and 80 deletions

View File

@ -24,14 +24,13 @@ Unreleased
* create_transit_gateway_route() * create_transit_gateway_route()
* create_transit_gateway_route_table() * create_transit_gateway_route_table()
* create_transit_gateway_vpc_attachment() * create_transit_gateway_vpc_attachment()
* create_transit_gateway_vpn_attachment()
* delete_transit_gateway() * delete_transit_gateway()
* delete_transit_gateway_route() * delete_transit_gateway_route()
* delete_transit_gateway_route_table() * delete_transit_gateway_route_table()
* describe_transit_gateway_attachments() * describe_transit_gateway_attachments()
* describe_transit_gateway_route_tables()
* describe_transit_gateway_vpc_attachments() * describe_transit_gateway_vpc_attachments()
* get_all_transit_gateways() * describe_transit_gateways()
* get_all_transit_gateway_route_tables()
* modify_transit_gateway() * modify_transit_gateway()
* search_transit_gateway_routes() * search_transit_gateway_routes()
* Events: * Events:

View File

@ -91,7 +91,7 @@ class NoIntegrationResponseDefined(NotFoundException):
def __init__(self, code=None): def __init__(self, code=None):
super(NoIntegrationResponseDefined, self).__init__( super(NoIntegrationResponseDefined, self).__init__(
"NotFoundException", "No integration defined for method, code '%s'" % code "NotFoundException", "Invalid Response status code specified"
) )
@ -161,7 +161,7 @@ class DomainNameNotFound(NotFoundException):
def __init__(self): def __init__(self):
super(DomainNameNotFound, self).__init__( super(DomainNameNotFound, self).__init__(
"NotFoundException", "Invalid Domain Name specified" "NotFoundException", "Invalid domain name identifier specified"
) )

View File

@ -21,6 +21,7 @@ from .exceptions import (
ApiKeyValueMinLength, ApiKeyValueMinLength,
InvalidRequestInput, InvalidRequestInput,
NoIntegrationDefined, NoIntegrationDefined,
NoIntegrationResponseDefined,
NotFoundException, NotFoundException,
) )
@ -368,26 +369,21 @@ class APIGatewayResponse(BaseResponse):
function_id = url_path_parts[2] function_id = url_path_parts[2]
stage_name = url_path_parts[4] stage_name = url_path_parts[4]
if self.method == "GET": try:
try: if self.method == "GET":
stage_response = self.backend.get_stage(function_id, stage_name) stage_response = self.backend.get_stage(function_id, stage_name)
except StageNotFoundException as error:
return ( elif self.method == "PATCH":
error.code, patch_operations = self._get_param("patchOperations")
{}, stage_response = self.backend.update_stage(
'{{"message":"{0}","code":"{1}"}}'.format( function_id, stage_name, patch_operations
error.message, error.error_type
),
) )
elif self.method == "PATCH": elif self.method == "DELETE":
patch_operations = self._get_param("patchOperations") self.backend.delete_stage(function_id, stage_name)
stage_response = self.backend.update_stage( return 202, {}, "{}"
function_id, stage_name, patch_operations return 200, {}, json.dumps(stage_response)
) except StageNotFoundException as error:
elif self.method == "DELETE": return error.code, {}, error.get_body()
self.backend.delete_stage(function_id, stage_name)
return 202, {}, "{}"
return 200, {}, json.dumps(stage_response)
def integrations(self, request, full_url, headers): def integrations(self, request, full_url, headers):
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)
@ -476,7 +472,7 @@ class APIGatewayResponse(BaseResponse):
return 200, {}, json.dumps(integration_response) return 200, {}, json.dumps(integration_response)
except BadRequestException as e: except BadRequestException as e:
return self.error("BadRequestException", e.message) return self.error("BadRequestException", e.message)
except NoIntegrationDefined as e: except (NoIntegrationDefined, NoIntegrationResponseDefined) as e:
return self.error("NotFoundException", e.message) return self.error("NotFoundException", e.message)
def deployments(self, request, full_url, headers): def deployments(self, request, full_url, headers):
@ -552,9 +548,12 @@ class APIGatewayResponse(BaseResponse):
status_code = 200 status_code = 200
if self.method == "GET": if self.method == "GET":
include_value = self._get_bool_param("includeValue") include_value = self._get_bool_param("includeValue")
apikey_response = self.backend.get_api_key( try:
apikey, include_value=include_value apikey_response = self.backend.get_api_key(
) apikey, include_value=include_value
)
except ApiKeyNotFoundException as e:
return self.error("NotFoundException", e.message)
elif self.method == "PATCH": elif self.method == "PATCH":
patch_operations = self._get_param("patchOperations") patch_operations = self._get_param("patchOperations")
apikey_response = self.backend.update_api_key(apikey, patch_operations) apikey_response = self.backend.update_api_key(apikey, patch_operations)
@ -720,13 +719,7 @@ class APIGatewayResponse(BaseResponse):
return 404, {}, json.dumps({"error": msg}) return 404, {}, json.dumps({"error": msg})
return 200, {}, json.dumps(domain_names) return 200, {}, json.dumps(domain_names)
except DomainNameNotFound as error: except DomainNameNotFound as error:
return ( return self.error("NotFoundException", error.message)
error.code,
{},
'{{"message":"{0}","code":"{1}"}}'.format(
error.message, error.error_type
),
)
def models(self, request, full_url, headers): def models(self, request, full_url, headers):
self.setup_class(request, full_url, headers) self.setup_class(request, full_url, headers)

View File

@ -26,7 +26,7 @@ import requests.exceptions
from boto3 import Session from boto3 import Session
from moto.awslambda.policy import Policy from moto.awslambda.policy import Policy
from moto.core import BaseBackend, BaseModel, CloudFormationModel from moto.core import BaseBackend, CloudFormationModel
from moto.core.exceptions import RESTError from moto.core.exceptions import RESTError
from moto.iam.models import iam_backend from moto.iam.models import iam_backend
from moto.iam.exceptions import IAMNotFoundException from moto.iam.exceptions import IAMNotFoundException
@ -1072,32 +1072,6 @@ class LayerStorage(object):
return None return None
class LambdaPermission(BaseModel):
def __init__(self, spec):
self.action = spec["Action"]
self.function_name = spec["FunctionName"]
self.principal = spec["Principal"]
# optional
self.source_account = spec.get("SourceAccount")
self.source_arn = spec.get("SourceArn")
@classmethod
def create_from_cloudformation_json(
cls, resource_name, cloudformation_json, region_name
):
properties = cloudformation_json["Properties"]
spec = {
"Action": properties["Action"],
"FunctionName": properties["FunctionName"],
"Principal": properties["Principal"],
}
optional_properties = "SourceAccount SourceArn".split()
for prop in optional_properties:
if prop in properties:
spec[prop] = properties[prop]
return LambdaPermission(spec)
class LambdaBackend(BaseBackend): class LambdaBackend(BaseBackend):
def __init__(self, region_name): def __init__(self, region_name):
self._lambdas = LambdaStorage() self._lambdas = LambdaStorage()

View File

@ -32,7 +32,7 @@ class CognitoIdentityResponse(BaseResponse):
pool_name = self._get_param("IdentityPoolName") pool_name = self._get_param("IdentityPoolName")
allow_unauthenticated = self._get_bool_param("AllowUnauthenticatedIdentities") allow_unauthenticated = self._get_bool_param("AllowUnauthenticatedIdentities")
allow_classic = self._get_bool_param("AllowClassicFlow") allow_classic = self._get_bool_param("AllowClassicFlow")
login_providers = self._get_multi_param_dict("SupportedLoginProviders") login_providers = self._get_param("SupportedLoginProviders")
provider_name = self._get_param("DeveloperProviderName") provider_name = self._get_param("DeveloperProviderName")
provider_arns = self._get_multi_param("OpenIdConnectProviderARNs") provider_arns = self._get_multi_param("OpenIdConnectProviderARNs")
identity_providers = self._get_multi_param_dict("CognitoIdentityProviders") identity_providers = self._get_multi_param_dict("CognitoIdentityProviders")

View File

@ -5944,12 +5944,11 @@ class TransitGateway(TaggedEC2Resource, CloudFormationModel):
"VpnEcmpSupport": "enable", "VpnEcmpSupport": "enable",
} }
def __init__(self, backend, description=None, options=None, tags=None): def __init__(self, backend, description=None, options=None):
self.ec2_backend = backend self.ec2_backend = backend
self.id = random_transit_gateway_id() self.id = random_transit_gateway_id()
self.description = description self.description = description
self.state = "available" self.state = "available"
self.add_tags(tags or {})
self.options = merge_multiple_dicts(self.DEFAULT_OPTIONS, options or {}) self.options = merge_multiple_dicts(self.DEFAULT_OPTIONS, options or {})
self._created_at = datetime.utcnow() self._created_at = datetime.utcnow()
@ -5979,10 +5978,19 @@ class TransitGateway(TaggedEC2Resource, CloudFormationModel):
cls, resource_name, cloudformation_json, region_name cls, resource_name, cloudformation_json, region_name
): ):
ec2_backend = ec2_backends[region_name] ec2_backend = ec2_backends[region_name]
properties = cloudformation_json["Properties"]
description = properties["Description"]
options = dict(properties)
del options["Description"]
transit_gateway = ec2_backend.create_transit_gateway( transit_gateway = ec2_backend.create_transit_gateway(
cloudformation_json["Properties"]["Description"], description=description, options=options
cloudformation_json["Properties"]["Options"],
) )
for tag in properties.get("Tags", []):
tag_key = tag["Key"]
tag_value = tag["Value"]
transit_gateway.add_tag(tag_key, tag_value)
return transit_gateway return transit_gateway
@ -5991,8 +5999,13 @@ class TransitGatewayBackend(object):
self.transit_gateways = {} self.transit_gateways = {}
super(TransitGatewayBackend, self).__init__() super(TransitGatewayBackend, self).__init__()
def create_transit_gateway(self, description=None, options=None, tags=None): def create_transit_gateway(self, description=None, options=None, tags=[]):
transit_gateway = TransitGateway(self, description, options, tags) transit_gateway = TransitGateway(self, description, options)
for tag in tags:
tag_key = tag.get("Key")
tag_value = tag.get("Value")
transit_gateway.add_tag(tag_key, tag_value)
self.transit_gateways[transit_gateway.id] = transit_gateway self.transit_gateways[transit_gateway.id] = transit_gateway
return transit_gateway return transit_gateway
@ -6200,6 +6213,7 @@ class TransitGatewayAttachment(TaggedEC2Resource):
self.add_tags(tags or {}) self.add_tags(tags or {})
self._created_at = datetime.utcnow() self._created_at = datetime.utcnow()
self.owner_id = ACCOUNT_ID
@property @property
def create_time(self): def create_time(self):

0
moto/ec2/regions.py Normal file
View File

View File

@ -111,7 +111,7 @@ DESCRIBE_TRANSIT_GATEWAY_ATTACHMENTS = """<DescribeTransitGatewayAttachmentsResp
</tagSet> </tagSet>
<transitGatewayAttachmentId>{{ transit_gateway_attachment.id }}</transitGatewayAttachmentId> <transitGatewayAttachmentId>{{ transit_gateway_attachment.id }}</transitGatewayAttachmentId>
<transitGatewayId>{{ transit_gateway_attachment.transit_gateway_id }}</transitGatewayId> <transitGatewayId>{{ transit_gateway_attachment.transit_gateway_id }}</transitGatewayId>
<transitGatewayOwnerId>074255357339</transitGatewayOwnerId> <transitGatewayOwnerId>{{ transit_gateway_attachment.resource_owner_id }}</transitGatewayOwnerId>
</item> </item>
{% endfor %} {% endfor %}
</transitGatewayAttachments> </transitGatewayAttachments>
@ -133,7 +133,7 @@ DESCRIBE_TRANSIT_GATEWAY_VPC_ATTACHMENTS = """<DescribeTransitGatewayVpcAttachme
<state>{{ transit_gateway_vpc_attachment.state }}</state> <state>{{ transit_gateway_vpc_attachment.state }}</state>
<subnetIds> <subnetIds>
{% for id in transit_gateway_vpc_attachment.subnet_ids %} {% for id in transit_gateway_vpc_attachment.subnet_ids %}
<item>id</item> <item>{{ id }}</item>
{% endfor %} {% endfor %}
</subnetIds> </subnetIds>
<tagSet> <tagSet>
@ -147,7 +147,7 @@ DESCRIBE_TRANSIT_GATEWAY_VPC_ATTACHMENTS = """<DescribeTransitGatewayVpcAttachme
<transitGatewayAttachmentId>{{ transit_gateway_vpc_attachment.id }}</transitGatewayAttachmentId> <transitGatewayAttachmentId>{{ transit_gateway_vpc_attachment.id }}</transitGatewayAttachmentId>
<transitGatewayId>{{ transit_gateway_vpc_attachment.transit_gateway_id }}</transitGatewayId> <transitGatewayId>{{ transit_gateway_vpc_attachment.transit_gateway_id }}</transitGatewayId>
<vpcId>{{ transit_gateway_vpc_attachment.vpc_id }}</vpcId> <vpcId>{{ transit_gateway_vpc_attachment.vpc_id }}</vpcId>
<vpcOwnerId>074255357339</vpcOwnerId> <vpcOwnerId>{{ transit_gateway_vpc_attachment.resource_owner_id }}</vpcOwnerId>
</item> </item>
{% endfor %} {% endfor %}
</transitGatewayVpcAttachments> </transitGatewayVpcAttachments>

View File

@ -8,9 +8,8 @@ class TransitGateways(BaseResponse):
description = self._get_param("Description") or None description = self._get_param("Description") or None
options = self._get_multi_param_dict("Options") options = self._get_multi_param_dict("Options")
tags = self._get_multi_param("TagSpecification") tags = self._get_multi_param("TagSpecification")
tags = tags[0] if isinstance(tags, list) and len(tags) == 1 else tags if tags:
tags = (tags or {}).get("Tag", []) tags = tags[0].get("Tag")
tags = {t["Key"]: t["Value"] for t in tags}
transit_gateway = self.ec2_backend.create_transit_gateway( transit_gateway = self.ec2_backend.create_transit_gateway(
description=description, options=options, tags=tags description=description, options=options, tags=tags

View File

@ -1705,12 +1705,6 @@ def test_get_domain_names():
def test_get_domain_name(): def test_get_domain_name():
client = boto3.client("apigateway", region_name="us-west-2") client = boto3.client("apigateway", region_name="us-west-2")
domain_name = "testDomain" domain_name = "testDomain"
# quering an invalid domain name which is not present
with pytest.raises(ClientError) as ex:
client.get_domain_name(domainName=domain_name)
ex.value.response["Error"]["Message"].should.equal("Invalid Domain Name specified")
ex.value.response["Error"]["Code"].should.equal("NotFoundException")
# adding a domain name # adding a domain name
client.create_domain_name(domainName=domain_name) client.create_domain_name(domainName=domain_name)
# retrieving the data of added domain name. # retrieving the data of added domain name.
@ -2278,3 +2272,87 @@ def create_method_integration(client, api_id, httpMethod="GET"):
statusCode="200", statusCode="200",
responseTemplates={}, responseTemplates={},
) )
return root_id
@mock_apigateway
def test_get_integration_response_unknown_response():
region_name = "us-west-2"
client = boto3.client("apigateway", region_name=region_name)
response = client.create_rest_api(name="my_api", description="this is my api")
api_id = response["id"]
root_id = create_method_integration(client, api_id)
client.get_integration_response(
restApiId=api_id, resourceId=root_id, httpMethod="GET", statusCode="200"
)
with pytest.raises(ClientError) as ex:
client.get_integration_response(
restApiId=api_id, resourceId=root_id, httpMethod="GET", statusCode="300"
)
ex.value.response["Error"]["Message"].should.equal(
"Invalid Response status code specified"
)
ex.value.response["Error"]["Code"].should.equal("NotFoundException")
@mock_apigateway
def test_delete_stage_unknown_stage():
client = boto3.client("apigateway", region_name="us-west-2")
response = client.create_rest_api(name="my_api", description="this is my api")
api_id = response["id"]
with pytest.raises(ClientError) as ex:
client.delete_stage(restApiId=api_id, stageName="unknown")
ex.value.response["Error"]["Message"].should.equal(
"Invalid stage identifier specified"
)
ex.value.response["Error"]["Code"].should.equal("NotFoundException")
@mock_apigateway
def test_get_api_key_unknown_apikey():
client = boto3.client("apigateway", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.get_api_key(apiKey="unknown")
ex.value.response["Error"]["Message"].should.equal(
"Invalid API Key identifier specified"
)
ex.value.response["Error"]["Code"].should.equal("NotFoundException")
@mock_apigateway
def test_get_domain_name_unknown_domainname():
client = boto3.client("apigateway", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.get_domain_name(domainName="www.google.com")
ex.value.response["Error"]["Message"].should.equal(
"Invalid domain name identifier specified"
)
ex.value.response["Error"]["Code"].should.equal("NotFoundException")
@mock_apigateway
def test_update_domain_name_unknown_domainname():
client = boto3.client("apigateway", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.update_domain_name(domainName="www.google.fr", patchOperations=[])
ex.value.response["Error"]["Message"].should.equal(
"Invalid domain name identifier specified"
)
ex.value.response["Error"]["Code"].should.equal("NotFoundException")
@mock_apigateway
def test_delete_domain_name_unknown_domainname():
client = boto3.client("apigateway", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.delete_domain_name(domainName="www.google.com")
ex.value.response["Error"]["Message"].should.equal(
"Invalid domain name identifier specified"
)
ex.value.response["Error"]["Code"].should.equal("NotFoundException")

View File

@ -71,6 +71,43 @@ def test_describe_identity_pool():
assert result["SamlProviderARNs"] == res["SamlProviderARNs"] assert result["SamlProviderARNs"] == res["SamlProviderARNs"]
@pytest.mark.parametrize(
"key,initial_value,updated_value",
[
(
"SupportedLoginProviders",
{"graph.facebook.com": "123456789012345"},
{"graph.facebook.com": "123456789012345", "graph.google.com": "00000000"},
),
("SupportedLoginProviders", {"graph.facebook.com": "123456789012345"}, {}),
("DeveloperProviderName", "dev1", "dev2"),
],
)
@mock_cognitoidentity
def test_update_identity_pool(key, initial_value, updated_value):
conn = boto3.client("cognito-identity", "us-west-2")
res = conn.create_identity_pool(
IdentityPoolName="TestPool",
AllowUnauthenticatedIdentities=False,
**dict({key: initial_value}),
)
first = conn.describe_identity_pool(IdentityPoolId=res["IdentityPoolId"])
first[key].should.equal(initial_value)
response = conn.update_identity_pool(
IdentityPoolId=res["IdentityPoolId"],
IdentityPoolName="TestPool",
AllowUnauthenticatedIdentities=False,
**dict({key: updated_value}),
)
response[key].should.equal(updated_value)
second = conn.describe_identity_pool(IdentityPoolId=res["IdentityPoolId"])
second[key].should.equal(response[key])
@mock_cognitoidentity @mock_cognitoidentity
def test_describe_identity_pool_with_invalid_id_raises_error(): def test_describe_identity_pool_with_invalid_id_raises_error():
conn = boto3.client("cognito-identity", "us-west-2") conn = boto3.client("cognito-identity", "us-west-2")

View File

@ -0,0 +1,406 @@
import boto3
import sure # noqa
from moto import mock_ec2
from moto.core import ACCOUNT_ID
@mock_ec2
def test_describe_transit_gateways():
ec2 = boto3.client("ec2", region_name="us-west-1")
response = ec2.describe_transit_gateways()
response.should.have.key("TransitGateways").equal([])
@mock_ec2
def test_create_transit_gateway():
ec2 = boto3.client("ec2", region_name="us-west-1")
response = ec2.create_transit_gateway(
Description="my first gateway", Options={"DnsSupport": "disable"}
)
gateway = response["TransitGateway"]
gateway.should.have.key("TransitGatewayId").match("tgw-[a-z0-9]+")
gateway.should.have.key("State").equal("available")
gateway.should.have.key("OwnerId").equal(ACCOUNT_ID)
gateway.should.have.key("Description").equal("my first gateway")
gateway.should.have.key("Tags").equal([])
options = gateway["Options"]
options.should.have.key("AmazonSideAsn").equal(64512)
options.should.have.key("TransitGatewayCidrBlocks").equal([])
options.should.have.key("AutoAcceptSharedAttachments").equal("disable")
options.should.have.key("DefaultRouteTableAssociation").equal("enable")
options.should.have.key("DefaultRouteTablePropagation").equal("enable")
options.should.have.key("PropagationDefaultRouteTableId").match("tgw-rtb-[a-z0-9]+")
options.should.have.key("VpnEcmpSupport").equal("enable")
options.should.have.key("DnsSupport").equal("disable")
#
# Verify we can retrieve it
response = ec2.describe_transit_gateways()
gateways = response["TransitGateways"]
gateways.should.have.length_of(1)
gateways[0].should.have.key("CreationTime")
gateways[0].should.have.key("TransitGatewayArn").equal(
"arn:aws:ec2:us-east-1:{}:transit-gateway/{}".format(
ACCOUNT_ID, gateway["TransitGatewayId"]
)
)
gateways[0]["Options"].should.have.key("AssociationDefaultRouteTableId").equal(
gateways[0]["Options"]["PropagationDefaultRouteTableId"]
)
del gateways[0]["CreationTime"]
del gateways[0]["TransitGatewayArn"]
del gateways[0]["Options"]["AssociationDefaultRouteTableId"]
gateway.should.equal(gateways[0])
@mock_ec2
def test_create_transit_gateway_with_tags():
ec2 = boto3.client("ec2", region_name="us-west-1")
response = ec2.create_transit_gateway(
Description="my first gateway",
TagSpecifications=[
{
"ResourceType": "transit-gateway",
"Tags": [
{"Key": "tag1", "Value": "val1"},
{"Key": "tag2", "Value": "val2"},
],
}
],
)
gateway = response["TransitGateway"]
gateway.should.have.key("TransitGatewayId").match("tgw-[a-z0-9]+")
tags = gateway.get("Tags", [])
tags.should.have.length_of(2)
tags.should.contain({"Key": "tag1", "Value": "val1"})
tags.should.contain({"Key": "tag2", "Value": "val2"})
@mock_ec2
def test_delete_transit_gateway():
ec2 = boto3.client("ec2", region_name="us-west-1")
g = ec2.create_transit_gateway(Description="my first gateway")["TransitGateway"]
ec2.describe_transit_gateways()["TransitGateways"].should.have.length_of(1)
ec2.delete_transit_gateway(TransitGatewayId=g["TransitGatewayId"])
ec2.describe_transit_gateways()["TransitGateways"].should.have.length_of(0)
@mock_ec2
def test_modify_transit_gateway():
ec2 = boto3.client("ec2", region_name="us-west-1")
g = ec2.create_transit_gateway(Description="my first gatway")["TransitGateway"]
ec2.describe_transit_gateways()["TransitGateways"].should.have.length_of(1)
ec2.describe_transit_gateways()["TransitGateways"][0]["Description"].should.equal(
"my first gatway"
)
ec2.modify_transit_gateway(
TransitGatewayId=g["TransitGatewayId"], Description="my first gateway"
)
ec2.describe_transit_gateways()["TransitGateways"].should.have.length_of(1)
ec2.describe_transit_gateways()["TransitGateways"][0]["Description"].should.equal(
"my first gateway"
)
@mock_ec2
def test_describe_transit_gateway_vpc_attachments():
ec2 = boto3.client("ec2", region_name="us-west-1")
response = ec2.describe_transit_gateway_vpc_attachments()
response.should.have.key("TransitGatewayVpcAttachments").equal([])
@mock_ec2
def test_describe_transit_gateway_attachments():
ec2 = boto3.client("ec2", region_name="us-west-1")
response = ec2.describe_transit_gateway_attachments()
response.should.have.key("TransitGatewayAttachments").equal([])
@mock_ec2
def test_create_transit_gateway_vpc_attachment():
ec2 = boto3.client("ec2", region_name="us-west-1")
response = ec2.create_transit_gateway_vpc_attachment(
TransitGatewayId="gateway_id", VpcId="some-vpc-id", SubnetIds=["sub1"]
)
attachment = response["TransitGatewayVpcAttachment"]
attachment.should.have.key("TransitGatewayAttachmentId").match("tgw-attach-*")
attachment.should.have.key("TransitGatewayId").equal("gateway_id")
attachment.should.have.key("VpcId").equal("some-vpc-id")
attachment.should.have.key("VpcOwnerId").equal(ACCOUNT_ID)
attachment.should.have.key("State").equal("available")
attachment.should.have.key("SubnetIds").equal(["sub1"])
attachment.should.have.key("Options").equal(
{
"DnsSupport": "enable",
"Ipv6Support": "disable",
"ApplianceModeSupport": "disable",
}
)
attachment.should.have.key("Tags").equal([])
#
# Verify we can retrieve it as a VPC attachment
attachments = ec2.describe_transit_gateway_vpc_attachments()[
"TransitGatewayVpcAttachments"
]
attachments.should.have.length_of(1)
attachments[0].should.have.key("CreationTime")
del attachments[0]["CreationTime"]
attachment.should.equal(attachments[0])
#
# Verify we can retrieve it as a general attachment
attachments = ec2.describe_transit_gateway_attachments()[
"TransitGatewayAttachments"
]
attachments.should.have.length_of(1)
attachments[0].should.have.key("CreationTime")
attachments[0].should.have.key("TransitGatewayOwnerId").equal(ACCOUNT_ID)
attachments[0].should.have.key("ResourceOwnerId").equal(ACCOUNT_ID)
attachments[0].should.have.key("ResourceType").equal("vpc")
attachments[0].should.have.key("ResourceId").equal("some-vpc-id")
attachments[0].should.have.key("State").equal("available")
attachments[0].should.have.key("Tags").equal([])
attachments[0].should.have.key("TransitGatewayAttachmentId").equal(
attachment["TransitGatewayAttachmentId"]
)
attachments[0].should.have.key("TransitGatewayId").equal("gateway_id")
@mock_ec2
def test_describe_transit_gateway_route_tables():
ec2 = boto3.client("ec2", region_name="us-west-1")
response = ec2.describe_transit_gateway_route_tables()
response.should.have.key("TransitGatewayRouteTables").equal([])
@mock_ec2
def test_create_transit_gateway_route_table():
ec2 = boto3.client("ec2", region_name="us-west-1")
tables = ec2.describe_transit_gateway_route_tables()["TransitGatewayRouteTables"]
tables.should.equal([])
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
table = ec2.create_transit_gateway_route_table(TransitGatewayId=gateway_id)[
"TransitGatewayRouteTable"
]
table.should.have.key("TransitGatewayRouteTableId").match("tgw-rtb-[0-9a-z]+")
table.should.have.key("TransitGatewayId").equals(gateway_id)
table.should.have.key("State").equals("available")
table.should.have.key("DefaultAssociationRouteTable").equals(False)
table.should.have.key("DefaultPropagationRouteTable").equals(False)
table.should.have.key("CreationTime")
table.should.have.key("Tags").equals([])
tables = ec2.describe_transit_gateway_route_tables()["TransitGatewayRouteTables"]
tables.should.have.length_of(1)
tables[0].should.equal(table)
@mock_ec2
def test_create_transit_gateway_route_table_with_tags():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
response = ec2.create_transit_gateway_route_table(
TransitGatewayId=gateway_id,
TagSpecifications=[
{
"ResourceType": "transit-gateway-route-table",
"Tags": [
{"Key": "tag1", "Value": "val1"},
{"Key": "tag2", "Value": "val2"},
],
}
],
)
table = response["TransitGatewayRouteTable"]
table["Tags"].should.have.length_of(2)
table["Tags"].should.contain({"Key": "tag1", "Value": "val1"})
table["Tags"].should.contain({"Key": "tag2", "Value": "val2"})
@mock_ec2
def test_delete_transit_gateway_route_table():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
table = ec2.create_transit_gateway_route_table(TransitGatewayId=gateway_id)[
"TransitGatewayRouteTable"
]
tables = ec2.describe_transit_gateway_route_tables()["TransitGatewayRouteTables"]
tables.should.have.length_of(1)
ec2.delete_transit_gateway_route_table(
TransitGatewayRouteTableId=table["TransitGatewayRouteTableId"]
)
tables = ec2.describe_transit_gateway_route_tables()["TransitGatewayRouteTables"]
tables.should.have.length_of(0)
@mock_ec2
def test_search_transit_gateway_routes_empty():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
table_id = ec2.create_transit_gateway_route_table(TransitGatewayId=gateway_id)[
"TransitGatewayRouteTable"
]["TransitGatewayRouteTableId"]
response = ec2.search_transit_gateway_routes(
TransitGatewayRouteTableId=table_id,
Filters=[{"Name": "state", "Values": ["active"]}],
)
response.should.have.key("Routes").equal([])
response.should.have.key("AdditionalRoutesAvailable").equal(False)
@mock_ec2
def test_create_transit_gateway_route():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
table_id = ec2.create_transit_gateway_route_table(TransitGatewayId=gateway_id)[
"TransitGatewayRouteTable"
]["TransitGatewayRouteTableId"]
route = ec2.create_transit_gateway_route(
DestinationCidrBlock="0.0.0.0", TransitGatewayRouteTableId=table_id
)["Route"]
route.should.have.key("DestinationCidrBlock").equal("0.0.0.0")
route.should.have.key("Type").equal("TODO")
route.should.have.key("State").equal("active")
@mock_ec2
def test_create_transit_gateway_route_as_blackhole():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
table_id = ec2.create_transit_gateway_route_table(TransitGatewayId=gateway_id)[
"TransitGatewayRouteTable"
]["TransitGatewayRouteTableId"]
route = ec2.create_transit_gateway_route(
DestinationCidrBlock="192.168.0.1",
TransitGatewayRouteTableId=table_id,
Blackhole=True,
)["Route"]
route.should.have.key("DestinationCidrBlock").equal("192.168.0.1")
route.should.have.key("Type").equal("TODO")
route.should.have.key("State").equal("blackhole")
@mock_ec2
def test_search_transit_gateway_routes_by_state():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
table_id = ec2.create_transit_gateway_route_table(TransitGatewayId=gateway_id)[
"TransitGatewayRouteTable"
]["TransitGatewayRouteTableId"]
ec2.create_transit_gateway_route(
DestinationCidrBlock="192.168.0.0", TransitGatewayRouteTableId=table_id
)
ec2.create_transit_gateway_route(
DestinationCidrBlock="192.168.0.1",
TransitGatewayRouteTableId=table_id,
Blackhole=True,
)
routes = ec2.search_transit_gateway_routes(
TransitGatewayRouteTableId=table_id,
Filters=[{"Name": "state", "Values": ["active"]}],
)["Routes"]
routes.should.equal(
[{"DestinationCidrBlock": "192.168.0.0", "Type": "TODO", "State": "active"}]
)
routes = ec2.search_transit_gateway_routes(
TransitGatewayRouteTableId=table_id,
Filters=[{"Name": "state", "Values": ["blackhole"]}],
)["Routes"]
routes.should.equal(
[{"DestinationCidrBlock": "192.168.0.1", "Type": "TODO", "State": "blackhole"}]
)
routes = ec2.search_transit_gateway_routes(
TransitGatewayRouteTableId=table_id,
Filters=[{"Name": "state", "Values": ["unknown"]}],
)["Routes"]
routes.should.equal([])
@mock_ec2
def test_delete_transit_gateway_route():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
table_id = ec2.create_transit_gateway_route_table(TransitGatewayId=gateway_id)[
"TransitGatewayRouteTable"
]["TransitGatewayRouteTableId"]
ec2.create_transit_gateway_route(
DestinationCidrBlock="192.168.0.0", TransitGatewayRouteTableId=table_id
)
ec2.create_transit_gateway_route(
DestinationCidrBlock="192.168.0.1", TransitGatewayRouteTableId=table_id
)
response = ec2.delete_transit_gateway_route(
DestinationCidrBlock="192.168.0.0", TransitGatewayRouteTableId=table_id
)
response["Route"].should.equal(
{"DestinationCidrBlock": "192.168.0.0", "Type": "TODO", "State": "deleted"}
)
routes = ec2.search_transit_gateway_routes(
TransitGatewayRouteTableId=table_id,
Filters=[{"Name": "state", "Values": ["active"]}],
)["Routes"]
routes.should.equal(
[{"DestinationCidrBlock": "192.168.0.1", "Type": "TODO", "State": "active"}]
)
@mock_ec2
def test_create_transit_gateway_vpc_attachment():
ec2 = boto3.client("ec2", region_name="us-west-1")
gateway_id = ec2.create_transit_gateway(Description="g")["TransitGateway"][
"TransitGatewayId"
]
response = ec2.create_transit_gateway_vpc_attachment(
TransitGatewayId=gateway_id, VpcId="vpc-id", SubnetIds=["sub1"]
)
response.should.have.key("TransitGatewayVpcAttachment")
attachment = response["TransitGatewayVpcAttachment"]
attachment.should.have.key("TransitGatewayAttachmentId").match(
"tgw-attach-[0-9a-z]+"
)
attachment.should.have.key("TransitGatewayId").equal(gateway_id)
attachment.should.have.key("VpcId").equal("vpc-id")
attachment.should.have.key("VpcOwnerId").equal(ACCOUNT_ID)
attachment.should.have.key("SubnetIds").equal(["sub1"])
attachment.should.have.key("State").equal("available")
attachment.should.have.key("Tags").equal([])

View File

@ -0,0 +1,86 @@
from __future__ import unicode_literals
import boto3
import json
import sure # noqa
from moto import (
mock_cloudformation,
mock_ec2,
)
@mock_cloudformation
@mock_ec2
def test_transit_gateway_by_cloudformation_simple():
ec2 = boto3.client("ec2", region_name="us-east-1")
cf_client = boto3.client("cloudformation", "us-east-1")
ec2.describe_transit_gateways()["TransitGateways"].should.have.length_of(0)
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Template for Transit Gateway creation.",
"Resources": {
"ttg": {
"Type": "AWS::EC2::TransitGateway",
"Properties": {"Description": "My CF Gateway"},
}
},
}
template = json.dumps(template)
cf_client.create_stack(StackName="test_stack", TemplateBody=template)
gateways = ec2.describe_transit_gateways()["TransitGateways"]
gateways.should.have.length_of(1)
gateways[0]["TransitGatewayId"].should.match("tgw-[0-9a-z]+")
gateways[0]["State"].should.equal("available")
gateways[0]["Description"].should.equal("My CF Gateway")
gateways[0]["Options"]["AmazonSideAsn"].should.equal(64512)
gateways[0]["Options"]["AutoAcceptSharedAttachments"].should.equal("disable")
gateways[0]["Options"]["DefaultRouteTableAssociation"].should.equal("enable")
# Gateway will only have the OOTB CF tags
gateways[0]["Tags"].should.have.length_of(3)
@mock_cloudformation
@mock_ec2
def test_transit_gateway_by_cloudformation():
ec2 = boto3.client("ec2", region_name="us-east-1")
cf_client = boto3.client("cloudformation", "us-east-1")
ec2.describe_transit_gateways()["TransitGateways"].should.have.length_of(0)
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Template for Transit Gateway creation.",
"Resources": {
"ttg": {
"Type": "AWS::EC2::TransitGateway",
"Properties": {
"Description": "My CF Gateway",
"AmazonSideAsn": 1,
"AutoAcceptSharedAttachments": "enable",
"DefaultRouteTableAssociation": "disable",
"Tags": [{"Key": "foo", "Value": "bar"}],
},
}
},
}
template = json.dumps(template)
cf_client.create_stack(StackName="test_stack", TemplateBody=template)
gateways = ec2.describe_transit_gateways()["TransitGateways"]
gateways.should.have.length_of(1)
gateways[0]["TransitGatewayId"].should.match("tgw-[0-9a-z]+")
gateways[0]["State"].should.equal("available")
gateways[0]["Description"].should.equal("My CF Gateway")
gateways[0]["Options"]["AmazonSideAsn"].should.equal(1)
gateways[0]["Options"]["AutoAcceptSharedAttachments"].should.equal("enable")
gateways[0]["Options"]["DefaultRouteTableAssociation"].should.equal("disable")
tags = gateways[0].get("Tags", {})
tags.should.have.length_of(4)
tags.should.contain({"Key": "foo", "Value": "bar"})
tags.should.contain({"Key": "aws:cloudformation:stack-name", "Value": "test_stack"})
tags.should.contain({"Key": "aws:cloudformation:logical-id", "Value": "ttg"})

View File

@ -2167,6 +2167,53 @@ def test_create_and_list_connections():
) )
@mock_events
def test_create_and_describe_connection():
client = boto3.client("events", "eu-central-1")
client.create_connection(
Name="test",
Description="test description",
AuthorizationType="API_KEY",
AuthParameters={
"ApiKeyAuthParameters": {"ApiKeyName": "test", "ApiKeyValue": "test"}
},
)
description = client.describe_connection(Name="test")
description["Name"].should.equal("test")
description["Description"].should.equal("test description")
description["AuthorizationType"].should.equal("API_KEY")
description["ConnectionState"].should.equal("AUTHORIZED")
description.should.have.key("CreationTime")
@mock_events
def test_delete_connection():
client = boto3.client("events", "eu-central-1")
conns = client.list_connections()["Connections"]
conns.should.have.length_of(0)
client.create_connection(
Name="test",
Description="test description",
AuthorizationType="API_KEY",
AuthParameters={
"ApiKeyAuthParameters": {"ApiKeyName": "test", "ApiKeyValue": "test"}
},
)
conns = client.list_connections()["Connections"]
conns.should.have.length_of(1)
client.delete_connection(Name="test")
conns = client.list_connections()["Connections"]
conns.should.have.length_of(0)
@mock_events @mock_events
def test_create_and_list_api_destinations(): def test_create_and_list_api_destinations():
client = boto3.client("events", "eu-central-1") client = boto3.client("events", "eu-central-1")
@ -2213,6 +2260,75 @@ def test_create_and_list_api_destinations():
) )
@pytest.mark.parametrize(
"key,initial_value,updated_value",
[
("Description", "my aspi dest", "my actual api dest"),
("InvocationEndpoint", "www.google.com", "www.google.cz"),
("InvocationRateLimitPerSecond", 1, 32),
("HttpMethod", "GET", "PATCH"),
],
)
@mock_events
def test_create_and_update_api_destination(key, initial_value, updated_value):
client = boto3.client("events", "eu-central-1")
response = client.create_connection(
Name="test",
Description="test description",
AuthorizationType="API_KEY",
AuthParameters={
"ApiKeyAuthParameters": {"ApiKeyName": "test", "ApiKeyValue": "test"}
},
)
default_params = {
"Name": "test",
"Description": "test-description",
"ConnectionArn": response.get("ConnectionArn"),
"InvocationEndpoint": "www.google.com",
"HttpMethod": "GET",
}
default_params.update({key: initial_value})
client.create_api_destination(**default_params)
destination = client.describe_api_destination(Name="test")
destination[key].should.equal(initial_value)
client.update_api_destination(Name="test", **dict({key: updated_value}))
destination = client.describe_api_destination(Name="test")
destination[key].should.equal(updated_value)
@mock_events
def test_delete_api_destination():
client = boto3.client("events", "eu-central-1")
client.list_api_destinations()["ApiDestinations"].should.have.length_of(0)
response = client.create_connection(
Name="test",
AuthorizationType="API_KEY",
AuthParameters={
"ApiKeyAuthParameters": {"ApiKeyName": "test", "ApiKeyValue": "test"}
},
)
client.create_api_destination(
Name="testdest",
ConnectionArn=response.get("ConnectionArn"),
InvocationEndpoint="www.google.com",
HttpMethod="GET",
)
client.list_api_destinations()["ApiDestinations"].should.have.length_of(1)
client.delete_api_destination(Name="testdest")
client.list_api_destinations()["ApiDestinations"].should.have.length_of(0)
# Scenarios for describe_connection # Scenarios for describe_connection
# Scenario 01: Success # Scenario 01: Success
# Scenario 02: Failure - Connection not present # Scenario 02: Failure - Connection not present