parent
df8bd43a45
commit
0f32f3c50c
@ -2231,7 +2231,9 @@ class SecurityGroupBackend(object):
|
||||
ip_ranges = [json.loads(ip_ranges)]
|
||||
if ip_ranges:
|
||||
for cidr in ip_ranges:
|
||||
if not is_valid_cidr(cidr["CidrIp"]):
|
||||
if (type(cidr) is dict and not is_valid_cidr(cidr["CidrIp"])) or (
|
||||
type(cidr) is str and not is_valid_cidr(cidr)
|
||||
):
|
||||
raise InvalidCIDRSubnetError(cidr=cidr)
|
||||
|
||||
self._verify_group_will_respect_rule_count_limit(
|
||||
@ -2432,6 +2434,7 @@ class SecurityGroupIngress(CloudFormationModel):
|
||||
group_id = properties.get("GroupId")
|
||||
ip_protocol = properties.get("IpProtocol")
|
||||
cidr_ip = properties.get("CidrIp")
|
||||
cidr_desc = properties.get("Description")
|
||||
cidr_ipv6 = properties.get("CidrIpv6")
|
||||
from_port = properties.get("FromPort")
|
||||
source_security_group_id = properties.get("SourceSecurityGroupId")
|
||||
@ -2458,7 +2461,7 @@ class SecurityGroupIngress(CloudFormationModel):
|
||||
else:
|
||||
source_security_group_names = None
|
||||
if cidr_ip:
|
||||
ip_ranges = [cidr_ip]
|
||||
ip_ranges = [{"CidrIp": cidr_ip, "Description": cidr_desc}]
|
||||
else:
|
||||
ip_ranges = []
|
||||
|
||||
|
@ -714,15 +714,10 @@ def test_description_in_ip_permissions():
|
||||
)
|
||||
|
||||
result = conn.describe_security_groups(GroupIds=[sg["GroupId"]])
|
||||
group = result["SecurityGroups"][0]
|
||||
|
||||
assert (
|
||||
result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0]["Description"]
|
||||
== "testDescription"
|
||||
)
|
||||
assert (
|
||||
result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0]["CidrIp"]
|
||||
== "1.2.3.4/32"
|
||||
)
|
||||
assert group["IpPermissions"][0]["IpRanges"][0]["Description"] == "testDescription"
|
||||
assert group["IpPermissions"][0]["IpRanges"][0]["CidrIp"] == "1.2.3.4/32"
|
||||
|
||||
sg = conn.create_security_group(
|
||||
GroupName="sg2", Description="Test security group sg1", VpcId=vpc.id
|
||||
@ -741,17 +736,10 @@ def test_description_in_ip_permissions():
|
||||
)
|
||||
|
||||
result = conn.describe_security_groups(GroupIds=[sg["GroupId"]])
|
||||
group = result["SecurityGroups"][0]
|
||||
|
||||
assert (
|
||||
result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0].get(
|
||||
"Description"
|
||||
)
|
||||
is None
|
||||
)
|
||||
assert (
|
||||
result["SecurityGroups"][0]["IpPermissions"][0]["IpRanges"][0]["CidrIp"]
|
||||
== "1.2.3.4/32"
|
||||
)
|
||||
assert group["IpPermissions"][0]["IpRanges"][0].get("Description") is None
|
||||
assert group["IpPermissions"][0]["IpRanges"][0]["CidrIp"] == "1.2.3.4/32"
|
||||
|
||||
|
||||
@mock_ec2
|
||||
|
139
tests/test_ec2/test_security_groups_cloudformation.py
Normal file
139
tests/test_ec2/test_security_groups_cloudformation.py
Normal file
@ -0,0 +1,139 @@
|
||||
import boto3
|
||||
import sure # noqa
|
||||
from moto import mock_cloudformation, mock_ec2
|
||||
|
||||
|
||||
SEC_GROUP_INGRESS = """{
|
||||
"AWSTemplateFormatVersion": "2010-09-09",
|
||||
"Description": "AWS CloudFormation Template to create an EC2 instance",
|
||||
"Parameters": {
|
||||
"VPCId": {
|
||||
"Type": "String",
|
||||
"Description": "The VPC ID",
|
||||
"AllowedPattern": "^vpc-[a-zA-Z0-9]*"
|
||||
}
|
||||
},
|
||||
"Resources": {
|
||||
"SecurityGroup": {
|
||||
"Type": "AWS::EC2::SecurityGroup",
|
||||
"Properties": {
|
||||
"GroupDescription": "Test VPC security group",
|
||||
"GroupName": "My-SG",
|
||||
"VpcId": {
|
||||
"Ref": "VPCId"
|
||||
}
|
||||
}
|
||||
},
|
||||
"SSHIngressRule": {
|
||||
"Type": "AWS::EC2::SecurityGroupIngress",
|
||||
"Properties": {
|
||||
"CidrIp": "10.0.0.0/8",
|
||||
"Description": "Allow SSH traffic from 10.0.0.0/8",
|
||||
"FromPort": 22,
|
||||
"ToPort": 22,
|
||||
"GroupId": {
|
||||
"Fn::GetAtt": [
|
||||
"SecurityGroup",
|
||||
"GroupId"
|
||||
]
|
||||
},
|
||||
"IpProtocol": "tcp"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
SEC_GROUP_INGRESS_WITHOUT_DESC = """{
|
||||
"AWSTemplateFormatVersion": "2010-09-09",
|
||||
"Description": "AWS CloudFormation Template to create an EC2 instance",
|
||||
"Parameters": {
|
||||
"VPCId": {
|
||||
"Type": "String",
|
||||
"Description": "The VPC ID",
|
||||
"AllowedPattern": "^vpc-[a-zA-Z0-9]*"
|
||||
}
|
||||
},
|
||||
"Resources": {
|
||||
"SecurityGroup": {
|
||||
"Type": "AWS::EC2::SecurityGroup",
|
||||
"Properties": {
|
||||
"GroupDescription": "Test VPC security group",
|
||||
"GroupName": "My-SG",
|
||||
"VpcId": {
|
||||
"Ref": "VPCId"
|
||||
}
|
||||
}
|
||||
},
|
||||
"SSHIngressRule": {
|
||||
"Type": "AWS::EC2::SecurityGroupIngress",
|
||||
"Properties": {
|
||||
"CidrIp": "10.0.0.0/8",
|
||||
"FromPort": 22,
|
||||
"ToPort": 22,
|
||||
"GroupId": {
|
||||
"Fn::GetAtt": [
|
||||
"SecurityGroup",
|
||||
"GroupId"
|
||||
]
|
||||
},
|
||||
"IpProtocol": "tcp"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_ec2
|
||||
def test_security_group_ingress():
|
||||
cf_client = boto3.client("cloudformation", region_name="us-east-1")
|
||||
ec2 = boto3.resource("ec2", region_name="us-west-1")
|
||||
ec2_client = boto3.client("ec2", region_name="us-east-1")
|
||||
|
||||
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
|
||||
cf_client.create_stack(
|
||||
StackName="test_stack",
|
||||
TemplateBody=SEC_GROUP_INGRESS,
|
||||
Parameters=[{"ParameterKey": "VPCId", "ParameterValue": vpc.id}],
|
||||
Capabilities=["CAPABILITY_NAMED_IAM"],
|
||||
OnFailure="DELETE",
|
||||
)
|
||||
|
||||
groups = ec2_client.describe_security_groups()["SecurityGroups"]
|
||||
group = [g for g in groups if g["GroupName"] == "My-SG"][0]
|
||||
group["Description"].should.equal("Test VPC security group")
|
||||
len(group["IpPermissions"]).should.be(1)
|
||||
ingress = group["IpPermissions"][0]
|
||||
ingress["FromPort"].should.equal(22)
|
||||
ingress["ToPort"].should.equal(22)
|
||||
ingress["IpProtocol"].should.equal("tcp")
|
||||
ingress["IpRanges"].should.equal(
|
||||
[{"CidrIp": "10.0.0.0/8", "Description": "Allow SSH traffic from 10.0.0.0/8"}]
|
||||
)
|
||||
|
||||
|
||||
@mock_cloudformation
|
||||
@mock_ec2
|
||||
def test_security_group_ingress_without_description():
|
||||
cf_client = boto3.client("cloudformation", region_name="us-east-1")
|
||||
ec2 = boto3.resource("ec2", region_name="us-west-1")
|
||||
ec2_client = boto3.client("ec2", region_name="us-east-1")
|
||||
|
||||
vpc = ec2.create_vpc(CidrBlock="10.0.0.0/16")
|
||||
cf_client.create_stack(
|
||||
StackName="test_stack",
|
||||
TemplateBody=SEC_GROUP_INGRESS_WITHOUT_DESC,
|
||||
Parameters=[{"ParameterKey": "VPCId", "ParameterValue": vpc.id}],
|
||||
Capabilities=["CAPABILITY_NAMED_IAM"],
|
||||
OnFailure="DELETE",
|
||||
)
|
||||
|
||||
groups = ec2_client.describe_security_groups()["SecurityGroups"]
|
||||
group = [g for g in groups if g["GroupName"] == "My-SG"][0]
|
||||
group["Description"].should.equal("Test VPC security group")
|
||||
len(group["IpPermissions"]).should.be(1)
|
||||
ingress = group["IpPermissions"][0]
|
||||
ingress["IpRanges"].should.equal([{"CidrIp": "10.0.0.0/8"}])
|
Loading…
Reference in New Issue
Block a user