Added DB Security Group Create/List/Delete/Authorize

This commit is contained in:
Mike Fuller 2015-01-29 17:25:39 +11:00
parent eb23980937
commit 8614b50898
3 changed files with 154 additions and 102 deletions

View File

@ -266,6 +266,8 @@ class SecurityGroup(object):
self.ip_ranges = []
self.ec2_security_groups = []
self.tags = []
self.owner_id = '1234567890'
self.vpc_id = None
def to_xml(self):
template = Template("""<DBSecurityGroup>
@ -294,6 +296,21 @@ class SecurityGroup(object):
</DBSecurityGroup>""")
return template.render(security_group=self)
def to_json(self):
template = Template("""{
"DBSecurityGroupDescription": "{{ security_group.description }}",
"DBSecurityGroupName": "{{ security_group.group_name }}",
"EC2SecurityGroups": {{ security_group.ec2_security_groups }},
"IPRanges": [{%- for ip in security_group.ip_ranges -%}
{%- if loop.index != 1 -%},{%- endif -%}
"{{ ip }}"
{%- endfor -%}
],
"OwnerId": "{{ security_group.owner_id }}",
"VpcId": "{{ security_group.vpc_id }}"
}""")
return template.render(security_group=self)
def authorize_cidr(self, cidr_ip):
self.ip_ranges.append(cidr_ip)

View File

@ -162,30 +162,38 @@ class RDS2Response(BaseResponse):
template = self.response_template(REMOVE_TAGS_FROM_RESOURCE_TEMPLATE)
return template.render()
# TODO: Update function to new method
def create_dbsecurity_group(self):
return self.create_db_security_group()
def create_db_security_group(self):
group_name = self._get_param('DBSecurityGroupName')
description = self._get_param('DBSecurityGroupDescription')
security_group = self.backend.create_security_group(group_name, description)
template = self.response_template(CREATE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
# TODO: Update function to new method
def describe_dbsecurity_groups(self):
return self.describe_db_security_groups()
def describe_db_security_groups(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_groups = self.backend.describe_security_groups(security_group_name)
template = self.response_template(DESCRIBE_SECURITY_GROUPS_TEMPLATE)
return template.render(security_groups=security_groups)
# TODO: Update function to new method
def delete_dbsecurity_group(self):
return self.delete_db_security_group()
def delete_db_security_group(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_group = self.backend.delete_security_group(security_group_name)
template = self.response_template(DELETE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
# TODO: Update function to new method
def authorize_dbsecurity_group_ingress(self):
return self.authorize_db_security_group_ingress()
def authorize_db_security_group_ingress(self):
security_group_name = self._get_param('DBSecurityGroupName')
cidr_ip = self._get_param('CIDRIP')
security_group = self.backend.authorize_security_group(security_group_name, cidr_ip)
@ -193,6 +201,9 @@ class RDS2Response(BaseResponse):
return template.render(security_group=security_group)
def create_dbsubnet_group(self):
return self.create_db_subnet_group()
def create_db_subnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
description = self._get_param('DBSubnetGroupDescription')
subnet_ids = self._get_multi_param('SubnetIds.member')
@ -202,13 +213,18 @@ class RDS2Response(BaseResponse):
return template.render(subnet_group=subnet_group)
def describe_dbsubnet_groups(self):
return self.describe_db_subnet_groups()
def describe_db_subnet_groups(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_groups = self.backend.describe_subnet_groups(subnet_name)
template = self.response_template(DESCRIBE_SUBNET_GROUPS_TEMPLATE)
return template.render(subnet_groups=subnet_groups)
# TODO: Update function to new method
def delete_dbsubnet_group(self):
return self.delete_db_subnet_group()
def delete_db_subnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_group = self.backend.delete_subnet_group(subnet_name)
template = self.response_template(DELETE_SUBNET_GROUP_TEMPLATE)
@ -331,42 +347,49 @@ DELETE_DATABASE_TEMPLATE = """{ "DeleteDBInstanceResponse": {
}
}"""
CREATE_SECURITY_GROUP_TEMPLATE = """<CreateDBSecurityGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateDBSecurityGroupResult>
{{ security_group.to_xml() }}
</CreateDBSecurityGroupResult>
<ResponseMetadata>
<RequestId>e68ef6fa-afc1-11c3-845a-476777009d19</RequestId>
</ResponseMetadata>
</CreateDBSecurityGroupResponse>"""
CREATE_SECURITY_GROUP_TEMPLATE = """{"CreateDBSecurityGroupResponse": {
"CreateDBSecurityGroupResult": {
"DBSecurityGroup":
{{ security_group.to_json() }},
"ResponseMetadata": {
"RequestId": "462165d0-a77a-11e4-a5fa-75b30c556f97"
}}
}
}"""
DESCRIBE_SECURITY_GROUPS_TEMPLATE = """<DescribeDBSecurityGroupsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<DescribeDBSecurityGroupsResult>
<DBSecurityGroups>
{% for security_group in security_groups %}
{{ security_group.to_xml() }}
{% endfor %}
</DBSecurityGroups>
</DescribeDBSecurityGroupsResult>
<ResponseMetadata>
<RequestId>b76e692c-b98c-11d3-a907-5a2c468b9cb0</RequestId>
</ResponseMetadata>
</DescribeDBSecurityGroupsResponse>"""
DESCRIBE_SECURITY_GROUPS_TEMPLATE = """{
"DescribeDBSecurityGroupsResponse": {
"ResponseMetadata": {
"RequestId": "5df2014e-a779-11e4-bdb0-594def064d0c"
},
"DescribeDBSecurityGroupsResult": {
"Marker": "null",
"DBSecurityGroups": [
{% for security_group in security_groups %}
{%- if loop.index != 1 -%},{%- endif -%}
{{ security_group.to_json() }}
{% endfor %}
]
}
}
}"""
DELETE_SECURITY_GROUP_TEMPLATE = """<DeleteDBSecurityGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<ResponseMetadata>
<RequestId>7aec7454-ba25-11d3-855b-576787000e19</RequestId>
</ResponseMetadata>
</DeleteDBSecurityGroupResponse>"""
DELETE_SECURITY_GROUP_TEMPLATE = """{"DeleteDBSecurityGroupResponse": {
"ResponseMetadata": {
"RequestId": "97e846bd-a77d-11e4-ac58-91351c0f3426"
}
}}"""
AUTHORIZE_SECURITY_GROUP_TEMPLATE = """<AuthorizeDBSecurityGroupIngressResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<AuthorizeDBSecurityGroupIngressResult>
{{ security_group.to_xml() }}
</AuthorizeDBSecurityGroupIngressResult>
<ResponseMetadata>
<RequestId>6176b5f8-bfed-11d3-f92b-31fa5e8dbc99</RequestId>
</ResponseMetadata>
</AuthorizeDBSecurityGroupIngressResponse>"""
AUTHORIZE_SECURITY_GROUP_TEMPLATE = """{
"AuthorizeDBSecurityGroupIngressResponse": {
"AuthorizeDBSecurityGroupIngressResult": {
"DBSecurityGroup": {{ security_group.to_json() }}
},
"ResponseMetadata": {
"RequestId": "75d32fd5-a77e-11e4-8892-b10432f7a87d"
}
}
}"""
CREATE_SUBNET_GROUP_TEMPLATE = """{
"CreateDBSubnetGroupResponse": {

View File

@ -359,70 +359,82 @@ def test_remove_tags_option_group():
list(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.have.length_of(1)
#@disable_on_py3()
#@mock_rds2
#def test_create_database_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
#
# security_group = conn.create_dbsecurity_group('db_sg', 'DB Security Group')
# security_group.name.should.equal('db_sg')
# security_group.description.should.equal("DB Security Group")
# list(security_group.ip_ranges).should.equal([])
#
#
#@mock_rds2
#def test_get_security_groups():
# conn = boto.rds2.connect_to_region("us-west-2")
#
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(0)
#
# conn.create_dbsecurity_group('db_sg1', 'DB Security Group')
# conn.create_dbsecurity_group('db_sg2', 'DB Security Group')
#
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(2)
#
# databases = conn.get_all_dbsecurity_groups("db_sg1")
# list(databases).should.have.length_of(1)
#
# databases[0].name.should.equal("db_sg1")
#
#
#@mock_rds2
#def test_get_non_existant_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.get_all_dbsecurity_groups.when.called_with("not-a-sg").should.throw(BotoServerError)
#
#
#@mock_rds2
#def test_delete_database_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.create_dbsecurity_group('db_sg', 'DB Security Group')
#
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(1)
#
# conn.delete_dbsecurity_group("db_sg")
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(0)
#
#
#@mock_rds2
#def test_delete_non_existant_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.delete_dbsecurity_group.when.called_with("not-a-db").should.throw(BotoServerError)
#
#
#@disable_on_py3()
#@mock_rds2
#def test_security_group_authorize():
# conn = boto.rds2.connect_to_region("us-west-2")
# security_group = conn.create_dbsecurity_group('db_sg', 'DB Security Group')
# list(security_group.ip_ranges).should.equal([])
#
# security_group.authorize(cidr_ip='10.3.2.45/32')
# security_group = conn.get_all_dbsecurity_groups()[0]
# list(security_group.ip_ranges).should.have.length_of(1)
# security_group.ip_ranges[0].cidr_ip.should.equal('10.3.2.45/32')
#
#
@disable_on_py3()
@mock_rds2
def test_create_database_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
result = conn.create_db_security_group('db_sg', 'DB Security Group')
result['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['DBSecurityGroupName'].should.equal("db_sg")
result['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['DBSecurityGroupDescription'].should.equal("DB Security Group")
result['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['IPRanges'].should.equal([])
@mock_rds2
def test_get_security_groups():
conn = boto.rds2.connect_to_region("us-west-2")
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(0)
conn.create_db_security_group('db_sg1', 'DB Security Group')
conn.create_db_security_group('db_sg2', 'DB Security Group')
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(2)
result = conn.describe_db_security_groups("db_sg1")
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(1)
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['DBSecurityGroupName'].should.equal("db_sg1")
@mock_rds2
def test_get_non_existant_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.describe_db_security_groups.when.called_with("not-a-sg").should.throw(BotoServerError)
@mock_rds2
def test_delete_database_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_security_group('db_sg', 'DB Security Group')
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(1)
conn.delete_db_security_group("db_sg")
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(0)
@mock_rds2
def test_delete_non_existant_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.delete_db_security_group.when.called_with("not-a-db").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_security_group_authorize():
conn = boto.rds2.connect_to_region("us-west-2")
security_group = conn.create_db_security_group('db_sg', 'DB Security Group')
security_group['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['IPRanges'].should.equal([])
conn.authorize_db_security_group_ingress(db_security_group_name='db_sg',
cidrip='10.3.2.45/32')
result = conn.describe_db_security_groups("db_sg")
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.have.length_of(1)
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.equal(['10.3.2.45/32'])
conn.authorize_db_security_group_ingress(db_security_group_name='db_sg',
cidrip='10.3.2.46/32')
result = conn.describe_db_security_groups("db_sg")
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.have.length_of(2)
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.equal(['10.3.2.45/32', '10.3.2.46/32'])
#@disable_on_py3()
#@mock_rds2
#def test_add_security_group_to_database():