From 020257904e1a0339ddff8bb4a88bc718fa1c8fe5 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Fri, 27 Aug 2021 11:28:10 +0100 Subject: [PATCH] Tech Debt - improve test coverage (#4229) --- moto/ec2/exceptions.py | 25 ++- moto/ssm/models.py | 4 +- tests/test_ec2/test_route_tables.py | 44 +++++ tests/test_ec2/test_vpc_peering.py | 93 ++++++++++ tests/test_ssm/test_ssm_doc_permissions.py | 190 +++++++++++++++++++++ 5 files changed, 341 insertions(+), 15 deletions(-) create mode 100644 tests/test_ssm/test_ssm_doc_permissions.py diff --git a/moto/ec2/exceptions.py b/moto/ec2/exceptions.py index 1e8399125..6033387cf 100644 --- a/moto/ec2/exceptions.py +++ b/moto/ec2/exceptions.py @@ -2,9 +2,10 @@ from __future__ import unicode_literals from moto.core.exceptions import RESTError -# DescribeRouteTable has a custom root-tag - vs -# TF complains if the roottag is incorrect -CUSTOM_ERROR_RESPONSE = """ +# EC2 has a custom root-tag - vs +# `terraform destroy` will complain if the roottag is incorrect +# See https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html#api-error-response +EC2_ERROR_RESPONSE = """ @@ -22,6 +23,13 @@ class EC2ClientError(RESTError): # EC2 uses as tag name in the XML response request_id_tag_name = "RequestID" + def __init__(self, *args, **kwargs): + + kwargs.setdefault("template", "custom_response") + self.templates["custom_response"] = EC2_ERROR_RESPONSE + + super(EC2ClientError, self).__init__(*args, **kwargs) + class DependencyViolationError(EC2ClientError): def __init__(self, message): @@ -75,14 +83,9 @@ class InvalidKeyPairFormatError(EC2ClientError): class InvalidVPCIdError(EC2ClientError): def __init__(self, vpc_id): - kwargs = {} - kwargs.setdefault("template", "custom_response") - self.templates["custom_response"] = CUSTOM_ERROR_RESPONSE super(InvalidVPCIdError, self).__init__( - "InvalidVpcID.NotFound", - "VpcID {0} does not exist.".format(vpc_id), - **kwargs + "InvalidVpcID.NotFound", "VpcID {0} does not exist.".format(vpc_id), ) @@ -197,14 +200,10 @@ class InvalidPermissionDuplicateError(EC2ClientError): class InvalidRouteTableIdError(EC2ClientError): def __init__(self, route_table_id): - kwargs = {} - kwargs.setdefault("template", "custom_response") - self.templates["custom_response"] = CUSTOM_ERROR_RESPONSE super(InvalidRouteTableIdError, self).__init__( "InvalidRouteTableID.NotFound", "The routeTable ID '{0}' does not exist".format(route_table_id), - **kwargs ) diff --git a/moto/ssm/models.py b/moto/ssm/models.py index 59d4a1ec8..0bdd5ca10 100644 --- a/moto/ssm/models.py +++ b/moto/ssm/models.py @@ -928,13 +928,13 @@ class SimpleSystemManagerBackend(BaseBackend): permission_type, ): - account_id_regex = re.compile(r"(all|[0-9]{12})") + account_id_regex = re.compile(r"^(all|[0-9]{12})$", re.IGNORECASE) version_regex = re.compile(r"^([$]LATEST|[$]DEFAULT|[$]ALL)$") account_ids_to_add = account_ids_to_add or [] account_ids_to_remove = account_ids_to_remove or [] - if not version_regex.match(shared_document_version): + if shared_document_version and not version_regex.match(shared_document_version): raise ValidationException( f"Value '{shared_document_version}' at 'sharedDocumentVersion' failed to satisfy constraint: " f"Member must satisfy regular expression pattern: ([$]LATEST|[$]DEFAULT|[$]ALL)." diff --git a/tests/test_ec2/test_route_tables.py b/tests/test_ec2/test_route_tables.py index 1ec4fcf6a..7c195235d 100644 --- a/tests/test_ec2/test_route_tables.py +++ b/tests/test_ec2/test_route_tables.py @@ -736,6 +736,50 @@ def test_create_route_tables_with_tags(): route_table.tags.should.have.length_of(1) +@mock_ec2 +def test_create_route_with_egress_only_igw(): + ec2 = boto3.resource("ec2", region_name="eu-central-1") + ec2_client = boto3.client("ec2", region_name="eu-central-1") + + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + eigw = ec2_client.create_egress_only_internet_gateway(VpcId=vpc.id) + eigw_id = eigw["EgressOnlyInternetGateway"]["EgressOnlyInternetGatewayId"] + + route_table = ec2.create_route_table(VpcId=vpc.id) + + ec2_client.create_route( + RouteTableId=route_table.id, EgressOnlyInternetGatewayId=eigw_id + ) + + route_table.reload() + eigw_route = [r for r in route_table.routes if r.destination_cidr_block == "None"][ + 0 + ] + eigw_route.egress_only_internet_gateway_id.should.equal(eigw_id) + eigw_route.state.should.equal("active") + + +@mock_ec2 +def test_create_route_with_unknown_egress_only_igw(): + ec2 = boto3.resource("ec2", region_name="eu-central-1") + ec2_client = boto3.client("ec2", region_name="eu-central-1") + + vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16") + subnet = ec2.create_subnet( + VpcId=vpc.id, CidrBlock="10.0.0.0/24", AvailabilityZone="us-west-2a" + ) + + route_table = ec2.create_route_table(VpcId=vpc.id) + + with pytest.raises(ClientError) as ex: + ec2_client.create_route( + RouteTableId=route_table.id, EgressOnlyInternetGatewayId="eoigw" + ) + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidGatewayID.NotFound") + err["Message"].should.equal("The eigw ID 'eoigw' does not exist") + + @mock_ec2 def test_associate_route_table_by_gateway(): ec2 = boto3.client("ec2", region_name="us-west-1") diff --git a/tests/test_ec2/test_vpc_peering.py b/tests/test_ec2/test_vpc_peering.py index 20f07f724..00454f6aa 100644 --- a/tests/test_ec2/test_vpc_peering.py +++ b/tests/test_ec2/test_vpc_peering.py @@ -117,6 +117,99 @@ def test_vpc_peering_connections_cross_region(): vpc_pcx_apn1.id.should.equal(vpc_pcx_usw1.id) vpc_pcx_apn1.requester_vpc.id.should.equal(vpc_usw1.id) vpc_pcx_apn1.accepter_vpc.id.should.equal(vpc_apn1.id) + # Quick check to verify the options have a default value + accepter_options = vpc_pcx_apn1.accepter_vpc_info["PeeringOptions"] + accepter_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False) + accepter_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False) + accepter_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False) + requester_options = vpc_pcx_apn1.requester_vpc_info["PeeringOptions"] + requester_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False) + requester_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False) + requester_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False) + + +@mock_ec2 +def test_modify_vpc_peering_connections_accepter_only(): + # create vpc in us-west-1 and ap-northeast-1 + ec2_usw1 = boto3.resource("ec2", region_name="us-west-1") + client = boto3.client("ec2", region_name="us-west-1") + vpc_usw1 = ec2_usw1.create_vpc(CidrBlock="10.90.0.0/16") + ec2_apn1 = boto3.resource("ec2", region_name="ap-northeast-1") + vpc_apn1 = ec2_apn1.create_vpc(CidrBlock="10.20.0.0/16") + # create peering + vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection( + VpcId=vpc_usw1.id, PeerVpcId=vpc_apn1.id, PeerRegion="ap-northeast-1" + ) + # + client.modify_vpc_peering_connection_options( + VpcPeeringConnectionId=vpc_pcx_usw1.id, + AccepterPeeringConnectionOptions={"AllowDnsResolutionFromRemoteVpc": True,}, + ) + # Accepter options are different + vpc_pcx_usw1.reload() + accepter_options = vpc_pcx_usw1.accepter_vpc_info["PeeringOptions"] + accepter_options["AllowDnsResolutionFromRemoteVpc"].should.equal(True) + accepter_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False) + accepter_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False) + # Requester options are untouched + requester_options = vpc_pcx_usw1.requester_vpc_info["PeeringOptions"] + requester_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False) + requester_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False) + requester_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False) + + +@mock_ec2 +def test_modify_vpc_peering_connections_requester_only(): + # create vpc in us-west-1 and ap-northeast-1 + ec2_usw1 = boto3.resource("ec2", region_name="us-west-1") + client = boto3.client("ec2", region_name="us-west-1") + vpc_usw1 = ec2_usw1.create_vpc(CidrBlock="10.90.0.0/16") + ec2_apn1 = boto3.resource("ec2", region_name="ap-northeast-1") + vpc_apn1 = ec2_apn1.create_vpc(CidrBlock="10.20.0.0/16") + # create peering + vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection( + VpcId=vpc_usw1.id, PeerVpcId=vpc_apn1.id, PeerRegion="ap-northeast-1" + ) + # + client.modify_vpc_peering_connection_options( + VpcPeeringConnectionId=vpc_pcx_usw1.id, + RequesterPeeringConnectionOptions={ + "AllowEgressFromLocalVpcToRemoteClassicLink": True, + }, + ) + # Requester options are different + vpc_pcx_usw1.reload() + requester_options = vpc_pcx_usw1.requester_vpc_info["PeeringOptions"] + requester_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False) + requester_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False) + requester_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(True) + # Accepter options are untouched + accepter_options = vpc_pcx_usw1.accepter_vpc_info["PeeringOptions"] + accepter_options["AllowDnsResolutionFromRemoteVpc"].should.equal(False) + accepter_options["AllowEgressFromLocalClassicLinkToRemoteVpc"].should.equal(False) + accepter_options["AllowEgressFromLocalVpcToRemoteClassicLink"].should.equal(False) + + +@mock_ec2 +def test_modify_vpc_peering_connections_unknown_vpc(): + # create vpc in us-west-1 and ap-northeast-1 + ec2_usw1 = boto3.resource("ec2", region_name="us-west-1") + client = boto3.client("ec2", region_name="us-west-1") + vpc_usw1 = ec2_usw1.create_vpc(CidrBlock="10.90.0.0/16") + ec2_apn1 = boto3.resource("ec2", region_name="ap-northeast-1") + vpc_apn1 = ec2_apn1.create_vpc(CidrBlock="10.20.0.0/16") + # create peering + vpc_pcx_usw1 = ec2_usw1.create_vpc_peering_connection( + VpcId=vpc_usw1.id, PeerVpcId=vpc_apn1.id, PeerRegion="ap-northeast-1" + ) + # + with pytest.raises(ClientError) as ex: + client.modify_vpc_peering_connection_options( + VpcPeeringConnectionId="vpx-unknown", RequesterPeeringConnectionOptions={} + ) + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidVpcPeeringConnectionId.NotFound") + err["Message"].should.equal("VpcPeeringConnectionID vpx-unknown does not exist.") @mock_ec2 diff --git a/tests/test_ssm/test_ssm_doc_permissions.py b/tests/test_ssm/test_ssm_doc_permissions.py new file mode 100644 index 000000000..eaf43e7b4 --- /dev/null +++ b/tests/test_ssm/test_ssm_doc_permissions.py @@ -0,0 +1,190 @@ +import boto3 +import pytest +import yaml + +from botocore.exceptions import ClientError +from moto import mock_ssm +from .test_ssm_docs import _get_yaml_template + + +@mock_ssm +def test_describe_document_permissions_unknown_document(): + client = boto3.client("ssm", region_name="us-east-1") + + with pytest.raises(ClientError) as ex: + client.describe_document_permission( + Name="UnknownDocument", PermissionType="Share" + ) + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidDocument") + err["Message"].should.equal("The specified document does not exist.") + + +def get_client(): + template_file = _get_yaml_template() + json_doc = yaml.safe_load(template_file) + client = boto3.client("ssm", region_name="us-east-1") + client.create_document( + Content=yaml.dump(json_doc), + Name="TestDocument", + DocumentType="Command", + DocumentFormat="YAML", + VersionName="Base", + ) + return client + + +@mock_ssm +def test_describe_document_permissions_initial(): + client = get_client() + + res = client.describe_document_permission( + Name="TestDocument", PermissionType="Share" + ) + res.should.have.key("AccountIds").equal([]) + res.should.have.key("AccountSharingInfoList").equal([]) + + +@pytest.mark.parametrize( + "ids", + [["111111111111"], ["all"], ["All"], ["111111111111", "222222222222"]], + ids=["one_value", "all", "All", "multiple_values"], +) +@mock_ssm +def test_modify_document_permission_add_account_id(ids): + client = get_client() + client.modify_document_permission( + Name="TestDocument", PermissionType="Share", AccountIdsToAdd=ids + ) + + res = client.describe_document_permission( + Name="TestDocument", PermissionType="Share" + ) + res.should.have.key("AccountIds") + set(res["AccountIds"]).should.equal(set(ids)) + res.should.have.key("AccountSharingInfoList").length_of(len(ids)) + for entry in [{"AccountId": _id} for _id in ids]: + res["AccountSharingInfoList"].should.contain(entry) + + +@pytest.mark.parametrize( + "initial,to_remove", + [ + (["all"], ["all"]), + (["111111111111"], ["111111111111"]), + (["111111111111", "222222222222"], ["222222222222"]), + ( + ["111111111111", "222222222222", "333333333333"], + ["111111111111", "333333333333"], + ), + ], + ids=["all", "one_value", "multiple_initials", "multiple_to_remove"], +) +@mock_ssm +def test_modify_document_permission_remove_account_id(initial, to_remove): + client = get_client() + client.modify_document_permission( + Name="TestDocument", PermissionType="Share", AccountIdsToAdd=initial + ) + client.modify_document_permission( + Name="TestDocument", PermissionType="Share", AccountIdsToRemove=to_remove + ) + + res = client.describe_document_permission( + Name="TestDocument", PermissionType="Share" + ) + res.should.have.key("AccountIds") + expected_new_list = set([x for x in initial if x not in to_remove]) + set(res["AccountIds"]).should.equal(expected_new_list) + res.should.have.key("AccountSharingInfoList").equal( + [{"AccountId": _id} for _id in expected_new_list] + ) + + +@mock_ssm +def test_fail_modify_document_permission_wrong_permission_type(): + client = get_client() + with pytest.raises(ClientError) as ex: + client.modify_document_permission( + Name="TestDocument", PermissionType="WrongValue", AccountIdsToAdd=[] + ) + err = ex.value.response["Error"] + err["Code"].should.equal("InvalidPermissionType") + err["Message"].should.match(r"Member must satisfy enum value set: \[Share\]") + + +@mock_ssm +def test_fail_modify_document_permission_wrong_document_version(): + client = get_client() + with pytest.raises(ClientError) as ex: + client.modify_document_permission( + Name="TestDocument", + PermissionType="Share", + SharedDocumentVersion="unknown", + AccountIdsToAdd=[], + ) + err = ex.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.match(r"Member must satisfy regular expression pattern") + + +@pytest.mark.parametrize( + "value", + [["alll"], ["1234"], ["1234123412341234"], ["account_id"]], + ids=["all?", "too_short", "too_long", "no-digits"], +) +@mock_ssm +def test_fail_modify_document_permission_add_invalid_account_ids(value): + client = get_client() + with pytest.raises(ClientError) as ex: + client.modify_document_permission( + Name="TestDocument", PermissionType="Share", AccountIdsToAdd=value + ) + err = ex.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.match(r"Member must satisfy regular expression pattern:") + + +@pytest.mark.parametrize( + "value", + [["alll"], ["1234"], ["1234123412341234"], ["account_id"]], + ids=["all?", "too_short", "too_long", "no-digits"], +) +@mock_ssm +def test_fail_modify_document_permission_remove_invalid_account_ids(value): + client = get_client() + with pytest.raises(ClientError) as ex: + client.modify_document_permission( + Name="TestDocument", PermissionType="Share", AccountIdsToRemove=value + ) + err = ex.value.response["Error"] + err["Code"].should.equal("ValidationException") + err["Message"].should.match(r"Member must satisfy regular expression pattern:") + + +@mock_ssm +def test_fail_modify_document_permission_add_all_and_specific(): + client = get_client() + with pytest.raises(ClientError) as ex: + client.modify_document_permission( + Name="TestDocument", + PermissionType="Share", + AccountIdsToAdd=["all", "123412341234"], + ) + err = ex.value.response["Error"] + err["Code"].should.equal("DocumentPermissionLimit") + err["Message"].should.equal("Accounts can either be all or a group of AWS accounts") + + +@mock_ssm +def test_fail_modify_document_permission_remove_all_and_specific(): + client = get_client() + with pytest.raises(ClientError) as ex: + client.modify_document_permission( + Name="TestDocument", + PermissionType="Share", + AccountIdsToRemove=["all", "123412341234"], + ) + err = ex.value.response["Error"] + err["Code"].should.equal("DocumentPermissionLimit") + err["Message"].should.equal("Accounts can either be all or a group of AWS accounts")