EC2: Add validations for create_subnet tags (#6860)

This commit is contained in:
Cristopher Pinzón 2023-10-03 16:23:07 -05:00 committed by GitHub
parent 24d9ea61ce
commit 197e8710af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 101 additions and 16 deletions

View File

@ -749,6 +749,14 @@ class GenericInvalidParameterValueError(EC2ClientError):
) )
class InvalidParameter(EC2ClientError):
def __init__(self, message: str):
super().__init__(
"InvalidParameter",
message,
)
class InvalidSubnetCidrBlockAssociationID(EC2ClientError): class InvalidSubnetCidrBlockAssociationID(EC2ClientError):
def __init__(self, association_id: str): def __init__(self, association_id: str):
super().__init__( super().__init__(

View File

@ -317,8 +317,9 @@ class SubnetBackend:
ipv6_cidr_block: Optional[str] = None, ipv6_cidr_block: Optional[str] = None,
availability_zone: Optional[str] = None, availability_zone: Optional[str] = None,
availability_zone_id: Optional[str] = None, availability_zone_id: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None, tags: Optional[Dict[str, Dict[str, str]]] = None,
) -> Subnet: ) -> Subnet:
subnet_id = random_subnet_id() subnet_id = random_subnet_id()
# Validate VPC exists and the supplied CIDR block is a subnet of the VPC's # Validate VPC exists and the supplied CIDR block is a subnet of the VPC's
vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined] vpc = self.get_vpc(vpc_id) # type: ignore[attr-defined]
@ -400,8 +401,8 @@ class SubnetBackend:
assign_ipv6_address_on_creation=False, assign_ipv6_address_on_creation=False,
) )
for tag in tags or []: for k, v in tags.get("subnet", {}).items() if tags else []:
subnet.add_tag(tag["Key"], tag["Value"]) subnet.add_tag(k, v)
# AWS associates a new subnet with the default Network ACL # AWS associates a new subnet with the default Network ACL
self.associate_default_network_acl_with_subnet(subnet_id, vpc_id) # type: ignore[attr-defined] self.associate_default_network_acl_with_subnet(subnet_id, vpc_id) # type: ignore[attr-defined]

View File

@ -1,6 +1,6 @@
from typing import Any, Dict from typing import Any, Dict, Optional
from moto.core.responses import BaseResponse from moto.core.responses import BaseResponse
from ..exceptions import EmptyTagSpecError from ..exceptions import EmptyTagSpecError, InvalidParameter
from ..utils import convert_tag_spec from ..utils import convert_tag_spec
@ -17,15 +17,32 @@ class EC2BaseResponse(BaseResponse):
# return {x1: y1, ...} # return {x1: y1, ...}
return {f["Name"]: f["Value"] for f in _filters} return {f["Name"]: f["Value"] for f in _filters}
def _parse_tag_specification(self) -> Dict[str, Dict[str, str]]: def _parse_tag_specification(
self, expected_type: Optional[str] = None
) -> Dict[str, Dict[str, str]]:
# [{"ResourceType": _type, "Tag": [{"Key": k, "Value": v}, ..]}] # [{"ResourceType": _type, "Tag": [{"Key": k, "Value": v}, ..]}]
tag_spec_set = self._get_multi_param("TagSpecification") tag_spec_set = self._get_multi_param(
"TagSpecification", skip_result_conversion=True
)
if not tag_spec_set: if not tag_spec_set:
tag_spec_set = self._get_multi_param("TagSpecifications") tag_spec_set = self._get_multi_param(
# If we do not pass any Tags, this method will convert this to [_type] instead "TagSpecifications", skip_result_conversion=True
if isinstance(tag_spec_set, list) and any( )
[isinstance(spec, str) for spec in tag_spec_set] if not tag_spec_set:
): return {}
tags_dict = (
tag_spec_set[0] if isinstance(tag_spec_set, list) else tag_spec_set
) # awscli allows for a json string to be passed and it should be allowed
if "ResourceType" not in tags_dict:
raise InvalidParameter("Tag specification resource type must have a value")
if expected_type and tags_dict["ResourceType"] != expected_type:
raise InvalidParameter(
f"'{tags_dict['ResourceType']}' is not a valid taggable resource type for this operation."
)
if "Tag" not in tags_dict:
if tags_dict.get("ResourceType") == "subnet":
raise InvalidParameter("Tag specification must have at least one tag")
raise EmptyTagSpecError raise EmptyTagSpecError
# {_type: {k: v, ..}}
return convert_tag_spec(tag_spec_set) return convert_tag_spec(tag_spec_set)

View File

@ -11,9 +11,7 @@ class Subnets(EC2BaseResponse):
ipv6_cidr_block = self._get_param("Ipv6CidrBlock") ipv6_cidr_block = self._get_param("Ipv6CidrBlock")
availability_zone = self._get_param("AvailabilityZone") availability_zone = self._get_param("AvailabilityZone")
availability_zone_id = self._get_param("AvailabilityZoneId") availability_zone_id = self._get_param("AvailabilityZoneId")
tags = self._get_multi_param("TagSpecification") tags = self._parse_tag_specification("subnet")
if tags:
tags = tags[0].get("Tag")
if not availability_zone and not availability_zone_id: if not availability_zone and not availability_zone_id:
availability_zone = random.choice( availability_zone = random.choice(

View File

@ -468,6 +468,67 @@ def test_retrieve_resource_with_multiple_tags():
assert blue_instances == [blue] assert blue_instances == [blue]
@mock_ec2
def test_ec2_validate_subnet_tags():
client = boto3.client("ec2", region_name="us-west-1")
# create vpc
vpc = client.create_vpc(CidrBlock="10.0.0.0/16")
vpc_id = vpc["Vpc"]["VpcId"]
with pytest.raises(ClientError) as ex:
client.create_subnet(
VpcId=vpc_id,
CidrBlock="10.0.0.1/24",
TagSpecifications=[{"Tags": [{"Key": "TEST_TAG", "Value": "TEST_VALUE"}]}],
)
assert ex.value.response["Error"]["Code"] == "InvalidParameter"
assert (
ex.value.response["Error"]["Message"]
== "Tag specification resource type must have a value"
)
with pytest.raises(ClientError) as ex:
client.create_subnet(
VpcId=vpc_id,
CidrBlock="10.0.0.1/24",
TagSpecifications=[{"ResourceType": "subnet"}],
)
assert ex.value.response["Error"]["Code"] == "InvalidParameter"
assert (
ex.value.response["Error"]["Message"]
== "Tag specification must have at least one tag"
)
with pytest.raises(ClientError) as ex:
client.create_subnet(
VpcId=vpc_id,
CidrBlock="10.0.0.1/24",
TagSpecifications=[
{
"ResourceType": "snapshot",
"Tags": [{"Key": "TEST_TAG", "Value": "TEST_VALUE"}],
}
],
)
assert ex.value.response["Error"]["Code"] == "InvalidParameter"
assert (
ex.value.response["Error"]["Message"]
== "'snapshot' is not a valid taggable resource type for this operation."
)
client.create_subnet(
VpcId=vpc_id,
CidrBlock="10.0.0.1/24",
TagSpecifications=[
{
"ResourceType": "subnet",
"Tags": [{"Key": "TEST_TAG", "Value": "TEST_VALUE"}],
}
],
)
def get_filter(tag_val): def get_filter(tag_val):
return [ return [
{"Name": "tag-key", "Values": ["application"]}, {"Name": "tag-key", "Values": ["application"]},