diff --git a/moto/ec2/exceptions.py b/moto/ec2/exceptions.py index e978992b1..2135bee39 100644 --- a/moto/ec2/exceptions.py +++ b/moto/ec2/exceptions.py @@ -749,6 +749,14 @@ class GenericInvalidParameterValueError(EC2ClientError): ) +class InvalidParameter(EC2ClientError): + def __init__(self, message: str): + super().__init__( + "InvalidParameter", + message, + ) + + class InvalidSubnetCidrBlockAssociationID(EC2ClientError): def __init__(self, association_id: str): super().__init__( diff --git a/moto/ec2/models/subnets.py b/moto/ec2/models/subnets.py index 9b8c0d6b5..f80df1585 100644 --- a/moto/ec2/models/subnets.py +++ b/moto/ec2/models/subnets.py @@ -317,8 +317,9 @@ class SubnetBackend: ipv6_cidr_block: Optional[str] = None, availability_zone: Optional[str] = None, availability_zone_id: Optional[str] = None, - tags: Optional[List[Dict[str, str]]] = None, + tags: Optional[Dict[str, Dict[str, str]]] = None, ) -> Subnet: + subnet_id = random_subnet_id() # Validate VPC exists and the supplied CIDR block is a subnet of the VPC's vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined] @@ -400,8 +401,8 @@ class SubnetBackend: assign_ipv6_address_on_creation=False, ) - for tag in tags or []: - subnet.add_tag(tag["Key"], tag["Value"]) + for k, v in tags.get("subnet", {}).items() if tags else []: + subnet.add_tag(k, v) # AWS associates a new subnet with the default Network ACL self.associate_default_network_acl_with_subnet(subnet_id, vpc_id) # type: ignore[attr-defined] diff --git a/moto/ec2/responses/_base_response.py b/moto/ec2/responses/_base_response.py index 5529baea3..05b2e698b 100644 --- a/moto/ec2/responses/_base_response.py +++ b/moto/ec2/responses/_base_response.py @@ -1,6 +1,6 @@ -from typing import Any, Dict +from typing import Any, Dict, Optional from moto.core.responses import BaseResponse -from ..exceptions import EmptyTagSpecError +from ..exceptions import EmptyTagSpecError, InvalidParameter from ..utils import convert_tag_spec @@ -17,15 +17,32 @@ class EC2BaseResponse(BaseResponse): # return {x1: y1, ...} return {f["Name"]: f["Value"] for f in _filters} - def _parse_tag_specification(self) -> Dict[str, Dict[str, str]]: + def _parse_tag_specification( + self, expected_type: Optional[str] = None + ) -> Dict[str, Dict[str, str]]: # [{"ResourceType": _type, "Tag": [{"Key": k, "Value": v}, ..]}] - tag_spec_set = self._get_multi_param("TagSpecification") + tag_spec_set = self._get_multi_param( + "TagSpecification", skip_result_conversion=True + ) if not tag_spec_set: - tag_spec_set = self._get_multi_param("TagSpecifications") - # If we do not pass any Tags, this method will convert this to [_type] instead - if isinstance(tag_spec_set, list) and any( - [isinstance(spec, str) for spec in tag_spec_set] - ): + tag_spec_set = self._get_multi_param( + "TagSpecifications", skip_result_conversion=True + ) + if not tag_spec_set: + return {} + + tags_dict = ( + tag_spec_set[0] if isinstance(tag_spec_set, list) else tag_spec_set + ) # awscli allows for a json string to be passed and it should be allowed + if "ResourceType" not in tags_dict: + raise InvalidParameter("Tag specification resource type must have a value") + if expected_type and tags_dict["ResourceType"] != expected_type: + raise InvalidParameter( + f"'{tags_dict['ResourceType']}' is not a valid taggable resource type for this operation." + ) + if "Tag" not in tags_dict: + if tags_dict.get("ResourceType") == "subnet": + raise InvalidParameter("Tag specification must have at least one tag") raise EmptyTagSpecError - # {_type: {k: v, ..}} + return convert_tag_spec(tag_spec_set) diff --git a/moto/ec2/responses/subnets.py b/moto/ec2/responses/subnets.py index 139882cca..365d8c27b 100644 --- a/moto/ec2/responses/subnets.py +++ b/moto/ec2/responses/subnets.py @@ -11,9 +11,7 @@ class Subnets(EC2BaseResponse): ipv6_cidr_block = self._get_param("Ipv6CidrBlock") availability_zone = self._get_param("AvailabilityZone") availability_zone_id = self._get_param("AvailabilityZoneId") - tags = self._get_multi_param("TagSpecification") - if tags: - tags = tags[0].get("Tag") + tags = self._parse_tag_specification("subnet") if not availability_zone and not availability_zone_id: availability_zone = random.choice( diff --git a/tests/test_ec2/test_tags.py b/tests/test_ec2/test_tags.py index 02b571194..158cd98ec 100644 --- a/tests/test_ec2/test_tags.py +++ b/tests/test_ec2/test_tags.py @@ -468,6 +468,67 @@ def test_retrieve_resource_with_multiple_tags(): assert blue_instances == [blue] +@mock_ec2 +def test_ec2_validate_subnet_tags(): + client = boto3.client("ec2", region_name="us-west-1") + + # create vpc + vpc = client.create_vpc(CidrBlock="10.0.0.0/16") + vpc_id = vpc["Vpc"]["VpcId"] + + with pytest.raises(ClientError) as ex: + client.create_subnet( + VpcId=vpc_id, + CidrBlock="10.0.0.1/24", + TagSpecifications=[{"Tags": [{"Key": "TEST_TAG", "Value": "TEST_VALUE"}]}], + ) + assert ex.value.response["Error"]["Code"] == "InvalidParameter" + assert ( + ex.value.response["Error"]["Message"] + == "Tag specification resource type must have a value" + ) + + with pytest.raises(ClientError) as ex: + client.create_subnet( + VpcId=vpc_id, + CidrBlock="10.0.0.1/24", + TagSpecifications=[{"ResourceType": "subnet"}], + ) + assert ex.value.response["Error"]["Code"] == "InvalidParameter" + assert ( + ex.value.response["Error"]["Message"] + == "Tag specification must have at least one tag" + ) + + with pytest.raises(ClientError) as ex: + client.create_subnet( + VpcId=vpc_id, + CidrBlock="10.0.0.1/24", + TagSpecifications=[ + { + "ResourceType": "snapshot", + "Tags": [{"Key": "TEST_TAG", "Value": "TEST_VALUE"}], + } + ], + ) + assert ex.value.response["Error"]["Code"] == "InvalidParameter" + assert ( + ex.value.response["Error"]["Message"] + == "'snapshot' is not a valid taggable resource type for this operation." + ) + + client.create_subnet( + VpcId=vpc_id, + CidrBlock="10.0.0.1/24", + TagSpecifications=[ + { + "ResourceType": "subnet", + "Tags": [{"Key": "TEST_TAG", "Value": "TEST_VALUE"}], + } + ], + ) + + def get_filter(tag_val): return [ {"Name": "tag-key", "Values": ["application"]},