Merge pull request #319 from spulec/cloudformation-ec2-SecurityGroupIngress

Add support to AWS::EC2::SecurityGroupIngress creation
This commit is contained in:
Steve Pulec 2015-02-25 14:05:22 -05:00
commit a2338fce86
3 changed files with 181 additions and 0 deletions

View File

@ -26,6 +26,7 @@ MODEL_MAP = {
"AWS::EC2::Route": ec2_models.Route,
"AWS::EC2::RouteTable": ec2_models.RouteTable,
"AWS::EC2::SecurityGroup": ec2_models.SecurityGroup,
"AWS::EC2::SecurityGroupIngress": ec2_models.SecurityGroupIngress,
"AWS::EC2::Subnet": ec2_models.Subnet,
"AWS::EC2::SubnetRouteTableAssociation": ec2_models.SubnetRouteTableAssociation,
"AWS::EC2::Volume": ec2_models.Volume,

View File

@ -121,6 +121,9 @@ class TaggedEC2Resource(object):
tags = self.ec2_backend.describe_tags(filters={'resource-id': [self.id]})
return tags
def add_tag(self, key, value):
self.ec2_backend.create_tags([self.id], {key: value})
def get_filter_value(self, filter_name):
tags = self.get_tags()
@ -1073,6 +1076,11 @@ class SecurityGroup(TaggedEC2Resource):
vpc_id=vpc_id,
)
for tag in properties.get("Tags", []):
tag_key = tag["Key"]
tag_value = tag["Value"]
security_group.add_tag(tag_key, tag_value)
for ingress_rule in properties.get('SecurityGroupIngress', []):
source_group_id = ingress_rule.get('SourceSecurityGroupId')
@ -1287,6 +1295,64 @@ class SecurityGroupBackend(object):
raise InvalidPermissionNotFoundError()
class SecurityGroupIngress(object):
def __init__(self, security_group, properties):
self.security_group = security_group
self.properties = properties
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
ec2_backend = ec2_backends[region_name]
group_name = properties.get('GroupName')
group_id = properties.get('GroupId')
ip_protocol = properties.get("IpProtocol")
cidr_ip = properties.get("CidrIp")
from_port = properties.get("FromPort")
source_security_group_id = properties.get("SourceSecurityGroupId")
source_security_group_name = properties.get("SourceSecurityGroupName")
source_security_owner_id = properties.get("SourceSecurityGroupOwnerId") # IGNORED AT THE MOMENT
to_port = properties.get("ToPort")
assert group_id or group_name
assert source_security_group_name or cidr_ip or source_security_group_id
assert ip_protocol
if source_security_group_id:
source_security_group_ids = [source_security_group_id]
else:
source_security_group_ids = None
if source_security_group_name:
source_security_group_names = [source_security_group_name]
else:
source_security_group_names = None
if cidr_ip:
ip_ranges = [cidr_ip]
else:
ip_ranges = []
if group_id:
security_group = ec2_backend.describe_security_groups(group_ids=[group_id])[0]
else:
security_group = ec2_backend.describe_security_groups(groupnames=[group_name])[0]
ec2_backend.authorize_security_group_ingress(
group_name=security_group.name,
group_id=security_group.id,
ip_protocol=ip_protocol,
from_port=from_port,
to_port=to_port,
ip_ranges=ip_ranges,
source_group_ids=source_security_group_ids,
source_group_names=source_security_group_names,
)
return cls(security_group, properties)
class VolumeAttachment(object):
def __init__(self, volume, instance, device):
self.volume = volume

View File

@ -1014,3 +1014,117 @@ def test_vpc_peering_creation():
peering_connections = vpc_conn.get_all_vpc_peering_connections()
peering_connections.should.have.length_of(1)
@mock_cloudformation
@mock_ec2
def test_security_group_ingress_separate_from_security_group_by_id():
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"test-security-group1": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"Tags": [
{
"Key": "sg-name",
"Value": "sg1"
}
]
},
},
"test-security-group2": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"Tags": [
{
"Key": "sg-name",
"Value": "sg2"
}
]
},
},
"test-sg-ingress": {
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupId": {"Ref": "test-security-group1"},
"IpProtocol": "tcp",
"FromPort": "80",
"ToPort": "8080",
"SourceSecurityGroupId": {"Ref": "test-security-group2"},
}
}
}
}
template_json = json.dumps(template)
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
ec2_conn = boto.ec2.connect_to_region("us-west-1")
security_group1 = ec2_conn.get_all_security_groups(filters={"tag:sg-name": "sg1"})[0]
security_group2 = ec2_conn.get_all_security_groups(filters={"tag:sg-name": "sg2"})[0]
security_group1.rules.should.have.length_of(1)
security_group1.rules[0].grants.should.have.length_of(1)
security_group1.rules[0].grants[0].group_id.should.equal(security_group2.id)
security_group1.rules[0].ip_protocol.should.equal('tcp')
security_group1.rules[0].from_port.should.equal('80')
security_group1.rules[0].to_port.should.equal('8080')
@mock_cloudformation
@mock_ec2
def test_security_group_ingress_separate_from_security_group_by_id():
ec2_conn = boto.ec2.connect_to_region("us-west-1")
ec2_conn.create_security_group("test-security-group1", "test security group")
template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"test-security-group2": {
"Type": "AWS::EC2::SecurityGroup",
"Properties": {
"GroupDescription": "test security group",
"Tags": [
{
"Key": "sg-name",
"Value": "sg2"
}
]
},
},
"test-sg-ingress": {
"Type": "AWS::EC2::SecurityGroupIngress",
"Properties": {
"GroupName": "test-security-group1",
"IpProtocol": "tcp",
"FromPort": "80",
"ToPort": "8080",
"SourceSecurityGroupId": {"Ref": "test-security-group2"},
}
}
}
}
template_json = json.dumps(template)
cf_conn = boto.cloudformation.connect_to_region("us-west-1")
cf_conn.create_stack(
"test_stack",
template_body=template_json,
)
security_group1 = ec2_conn.get_all_security_groups(groupnames=["test-security-group1"])[0]
security_group2 = ec2_conn.get_all_security_groups(filters={"tag:sg-name": "sg2"})[0]
security_group1.rules.should.have.length_of(1)
security_group1.rules[0].grants.should.have.length_of(1)
security_group1.rules[0].grants[0].group_id.should.equal(security_group2.id)
security_group1.rules[0].ip_protocol.should.equal('tcp')
security_group1.rules[0].from_port.should.equal('80')
security_group1.rules[0].to_port.should.equal('8080')