Tech Debt - improve test coverage (#4229)

This commit is contained in:
Bert Blommers 2021-08-27 11:28:10 +01:00 committed by GitHub
parent 11a37c357b
commit 020257904e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 341 additions and 15 deletions

View File

@ -2,9 +2,10 @@ from __future__ import unicode_literals
from moto.core.exceptions import RESTError
# DescribeRouteTable has a custom root-tag - <Response> vs <ErrorResponse>
# TF complains if the roottag is incorrect
CUSTOM_ERROR_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
# EC2 has a custom root-tag - <Response> vs <ErrorResponse>
# `terraform destroy` will complain if the roottag is incorrect
# See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html#api-error-response
EC2_ERROR_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Errors>
<Error>
@ -22,6 +23,13 @@ class EC2ClientError(RESTError):
# EC2 uses <RequestID> as tag name in the XML response
request_id_tag_name = "RequestID"
def __init__(self, *args, **kwargs):
kwargs.setdefault("template", "custom_response")
self.templates["custom_response"] = EC2_ERROR_RESPONSE
super(EC2ClientError, self).__init__(*args, **kwargs)
class DependencyViolationError(EC2ClientError):
def __init__(self, message):
@ -75,14 +83,9 @@ class InvalidKeyPairFormatError(EC2ClientError):
class InvalidVPCIdError(EC2ClientError):
def __init__(self, vpc_id):
kwargs = {}
kwargs.setdefault("template", "custom_response")
self.templates["custom_response"] = CUSTOM_ERROR_RESPONSE
super(InvalidVPCIdError, self).__init__(
"InvalidVpcID.NotFound",
"VpcID {0} does not exist.".format(vpc_id),
**kwargs
"InvalidVpcID.NotFound", "VpcID {0} does not exist.".format(vpc_id),
)
@ -197,14 +200,10 @@ class InvalidPermissionDuplicateError(EC2ClientError):
class InvalidRouteTableIdError(EC2ClientError):
def __init__(self, route_table_id):
kwargs = {}
kwargs.setdefault("template", "custom_response")
self.templates["custom_response"] = CUSTOM_ERROR_RESPONSE
super(InvalidRouteTableIdError, self).__init__(
"InvalidRouteTableID.NotFound",
"The routeTable ID '{0}' does not exist".format(route_table_id),
**kwargs
)

View File

@ -928,13 +928,13 @@ class SimpleSystemManagerBackend(BaseBackend):
permission_type,
):
account_id_regex = re.compile(r"(all|[0-9]{12})")
account_id_regex = re.compile(r"^(all|[0-9]{12})$", re.IGNORECASE)
version_regex = re.compile(r"^([$]LATEST|[$]DEFAULT|[$]ALL)$")
account_ids_to_add = account_ids_to_add or []
account_ids_to_remove = account_ids_to_remove or []
if not version_regex.match(shared_document_version):
if shared_document_version and not version_regex.match(shared_document_version):
raise ValidationException(
f"Value '{shared_document_version}' at 'sharedDocumentVersion' failed to satisfy constraint: "
f"Member must satisfy regular expression pattern: ([$]LATEST|[$]DEFAULT|[$]ALL)."

View File

@ -736,6 +736,50 @@ def test_create_route_tables_with_tags():
route_table.tags.should.have.length_of(1)
@mock_ec2
def test_create_route_with_egress_only_igw():
ec2 = boto3.resource("ec2", region_name="eu-central-1")
ec2_client = boto3.client("ec2", region_name="eu-central-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
eigw = ec2_client.create_egress_only_internet_gateway(VpcId=vpc.id)
eigw_id = eigw["EgressOnlyInternetGateway"]["EgressOnlyInternetGatewayId"]
route_table = ec2.create_route_table(VpcId=vpc.id)
ec2_client.create_route(
RouteTableId=route_table.id, EgressOnlyInternetGatewayId=eigw_id
)
route_table.reload()
eigw_route = [r for r in route_table.routes if r.destination_cidr_block == "None"][
0
]
eigw_route.egress_only_internet_gateway_id.should.equal(eigw_id)
eigw_route.state.should.equal("active")
@mock_ec2
def test_create_route_with_unknown_egress_only_igw():
ec2 = boto3.resource("ec2", region_name="eu-central-1")
ec2_client = boto3.client("ec2", region_name="eu-central-1")
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
subnet = ec2.create_subnet(
VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a"
)
route_table = ec2.create_route_table(VpcId=vpc.id)
with pytest.raises(ClientError) as ex:
ec2_client.create_route(
RouteTableId=route_table.id, EgressOnlyInternetGatewayId="eoigw"
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidGatewayID.NotFound")
err["Message"].should.equal("The eigw ID 'eoigw' does not exist")
@mock_ec2
def test_associate_route_table_by_gateway():
ec2 = boto3.client("ec2", region_name="us-west-1")

View File

@ -117,6 +117,99 @@ def test_vpc_peering_connections_cross_region():
vpc_pcx_apn1.id.should.equal(vpc_pcx_usw1.id)
vpc_pcx_apn1.requester_vpc.id.should.equal(vpc_usw1.id)
vpc_pcx_apn1.accepter_vpc.id.should.equal(vpc_apn1.id)
# Quick check to verify the options have a default value
accepter_options = vpc_pcx_apn1.accepter_vpc_info["PeeringOptions"]
accepter_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False)
accepter_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False)
accepter_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False)
requester_options = vpc_pcx_apn1.requester_vpc_info["PeeringOptions"]
requester_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False)
requester_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False)
requester_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False)
@mock_ec2
def test_modify_vpc_peering_connections_accepter_only():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource("ec2", region_name="us-west-1")
client = boto3.client("ec2", region_name="us-west-1")
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock="10.90.0.0/16")
ec2_apn1 = boto3.resource("ec2", region_name="ap-northeast-1")
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock="10.20.0.0/16")
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id, PeerVpcId=vpc_apn1.id, PeerRegion="ap-northeast-1"
)
#
client.modify_vpc_peering_connection_options(
VpcPeeringConnectionId=vpc_pcx_usw1.id,
AccepterPeeringConnectionOptions={"AllowDnsResolutionFromRemoteVpc": True,},
)
# Accepter options are different
vpc_pcx_usw1.reload()
accepter_options = vpc_pcx_usw1.accepter_vpc_info["PeeringOptions"]
accepter_options["AllowDnsResolutionFromRemoteVpc"].should.equal(True)
accepter_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False)
accepter_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False)
# Requester options are untouched
requester_options = vpc_pcx_usw1.requester_vpc_info["PeeringOptions"]
requester_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False)
requester_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False)
requester_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False)
@mock_ec2
def test_modify_vpc_peering_connections_requester_only():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource("ec2", region_name="us-west-1")
client = boto3.client("ec2", region_name="us-west-1")
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock="10.90.0.0/16")
ec2_apn1 = boto3.resource("ec2", region_name="ap-northeast-1")
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock="10.20.0.0/16")
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id, PeerVpcId=vpc_apn1.id, PeerRegion="ap-northeast-1"
)
#
client.modify_vpc_peering_connection_options(
VpcPeeringConnectionId=vpc_pcx_usw1.id,
RequesterPeeringConnectionOptions={
"AllowEgressFromLocalVpcToRemoteClassicLink": True,
},
)
# Requester options are different
vpc_pcx_usw1.reload()
requester_options = vpc_pcx_usw1.requester_vpc_info["PeeringOptions"]
requester_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False)
requester_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False)
requester_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(True)
# Accepter options are untouched
accepter_options = vpc_pcx_usw1.accepter_vpc_info["PeeringOptions"]
accepter_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False)
accepter_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False)
accepter_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False)
@mock_ec2
def test_modify_vpc_peering_connections_unknown_vpc():
# create vpc in us-west-1 and ap-northeast-1
ec2_usw1 = boto3.resource("ec2", region_name="us-west-1")
client = boto3.client("ec2", region_name="us-west-1")
vpc_usw1 = ec2_usw1.create_vpc(CidrBlock="10.90.0.0/16")
ec2_apn1 = boto3.resource("ec2", region_name="ap-northeast-1")
vpc_apn1 = ec2_apn1.create_vpc(CidrBlock="10.20.0.0/16")
# create peering
vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection(
VpcId=vpc_usw1.id, PeerVpcId=vpc_apn1.id, PeerRegion="ap-northeast-1"
)
#
with pytest.raises(ClientError) as ex:
client.modify_vpc_peering_connection_options(
VpcPeeringConnectionId="vpx-unknown", RequesterPeeringConnectionOptions={}
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidVpcPeeringConnectionId.NotFound")
err["Message"].should.equal("VpcPeeringConnectionID vpx-unknown does not exist.")
@mock_ec2

View File

@ -0,0 +1,190 @@
import boto3
import pytest
import yaml
from botocore.exceptions import ClientError
from moto import mock_ssm
from .test_ssm_docs import _get_yaml_template
@mock_ssm
def test_describe_document_permissions_unknown_document():
client = boto3.client("ssm", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.describe_document_permission(
Name="UnknownDocument", PermissionType="Share"
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidDocument")
err["Message"].should.equal("The specified document does not exist.")
def get_client():
template_file = _get_yaml_template()
json_doc = yaml.safe_load(template_file)
client = boto3.client("ssm", region_name="us-east-1")
client.create_document(
Content=yaml.dump(json_doc),
Name="TestDocument",
DocumentType="Command",
DocumentFormat="YAML",
VersionName="Base",
)
return client
@mock_ssm
def test_describe_document_permissions_initial():
client = get_client()
res = client.describe_document_permission(
Name="TestDocument", PermissionType="Share"
)
res.should.have.key("AccountIds").equal([])
res.should.have.key("AccountSharingInfoList").equal([])
@pytest.mark.parametrize(
"ids",
[["111111111111"], ["all"], ["All"], ["111111111111", "222222222222"]],
ids=["one_value", "all", "All", "multiple_values"],
)
@mock_ssm
def test_modify_document_permission_add_account_id(ids):
client = get_client()
client.modify_document_permission(
Name="TestDocument", PermissionType="Share", AccountIdsToAdd=ids
)
res = client.describe_document_permission(
Name="TestDocument", PermissionType="Share"
)
res.should.have.key("AccountIds")
set(res["AccountIds"]).should.equal(set(ids))
res.should.have.key("AccountSharingInfoList").length_of(len(ids))
for entry in [{"AccountId": _id} for _id in ids]:
res["AccountSharingInfoList"].should.contain(entry)
@pytest.mark.parametrize(
"initial,to_remove",
[
(["all"], ["all"]),
(["111111111111"], ["111111111111"]),
(["111111111111", "222222222222"], ["222222222222"]),
(
["111111111111", "222222222222", "333333333333"],
["111111111111", "333333333333"],
),
],
ids=["all", "one_value", "multiple_initials", "multiple_to_remove"],
)
@mock_ssm
def test_modify_document_permission_remove_account_id(initial, to_remove):
client = get_client()
client.modify_document_permission(
Name="TestDocument", PermissionType="Share", AccountIdsToAdd=initial
)
client.modify_document_permission(
Name="TestDocument", PermissionType="Share", AccountIdsToRemove=to_remove
)
res = client.describe_document_permission(
Name="TestDocument", PermissionType="Share"
)
res.should.have.key("AccountIds")
expected_new_list = set([x for x in initial if x not in to_remove])
set(res["AccountIds"]).should.equal(expected_new_list)
res.should.have.key("AccountSharingInfoList").equal(
[{"AccountId": _id} for _id in expected_new_list]
)
@mock_ssm
def test_fail_modify_document_permission_wrong_permission_type():
client = get_client()
with pytest.raises(ClientError) as ex:
client.modify_document_permission(
Name="TestDocument", PermissionType="WrongValue", AccountIdsToAdd=[]
)
err = ex.value.response["Error"]
err["Code"].should.equal("InvalidPermissionType")
err["Message"].should.match(r"Member must satisfy enum value set: \[Share\]")
@mock_ssm
def test_fail_modify_document_permission_wrong_document_version():
client = get_client()
with pytest.raises(ClientError) as ex:
client.modify_document_permission(
Name="TestDocument",
PermissionType="Share",
SharedDocumentVersion="unknown",
AccountIdsToAdd=[],
)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.match(r"Member must satisfy regular expression pattern")
@pytest.mark.parametrize(
"value",
[["alll"], ["1234"], ["1234123412341234"], ["account_id"]],
ids=["all?", "too_short", "too_long", "no-digits"],
)
@mock_ssm
def test_fail_modify_document_permission_add_invalid_account_ids(value):
client = get_client()
with pytest.raises(ClientError) as ex:
client.modify_document_permission(
Name="TestDocument", PermissionType="Share", AccountIdsToAdd=value
)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.match(r"Member must satisfy regular expression pattern:")
@pytest.mark.parametrize(
"value",
[["alll"], ["1234"], ["1234123412341234"], ["account_id"]],
ids=["all?", "too_short", "too_long", "no-digits"],
)
@mock_ssm
def test_fail_modify_document_permission_remove_invalid_account_ids(value):
client = get_client()
with pytest.raises(ClientError) as ex:
client.modify_document_permission(
Name="TestDocument", PermissionType="Share", AccountIdsToRemove=value
)
err = ex.value.response["Error"]
err["Code"].should.equal("ValidationException")
err["Message"].should.match(r"Member must satisfy regular expression pattern:")
@mock_ssm
def test_fail_modify_document_permission_add_all_and_specific():
client = get_client()
with pytest.raises(ClientError) as ex:
client.modify_document_permission(
Name="TestDocument",
PermissionType="Share",
AccountIdsToAdd=["all", "123412341234"],
)
err = ex.value.response["Error"]
err["Code"].should.equal("DocumentPermissionLimit")
err["Message"].should.equal("Accounts can either be all or a group of AWS accounts")
@mock_ssm
def test_fail_modify_document_permission_remove_all_and_specific():
client = get_client()
with pytest.raises(ClientError) as ex:
client.modify_document_permission(
Name="TestDocument",
PermissionType="Share",
AccountIdsToRemove=["all", "123412341234"],
)
err = ex.value.response["Error"]
err["Code"].should.equal("DocumentPermissionLimit")
err["Message"].should.equal("Accounts can either be all or a group of AWS accounts")