Merge remote-tracking branch 'upstream/master' into adding_iam_credentials_report

This commit is contained in:
Mike Fuller 2015-02-04 12:40:36 +11:00
commit d1f82b83ec
13 changed files with 1976 additions and 1 deletions

View File

@ -33,3 +33,5 @@ Moto is written by Steve Pulec with contributions from:
* [Peter](https://github.com/pvbouwel) * [Peter](https://github.com/pvbouwel)
* [Tyler Sanders](https://github.com/tsanders) * [Tyler Sanders](https://github.com/tsanders)
* [Gary Dalton](https://github.com/gary-dalton) * [Gary Dalton](https://github.com/gary-dalton)
* [Chris Henry](https://github.com/chrishenry)
* [Mike Fuller](https://github.com/mfulleratlassian)

View File

@ -79,6 +79,8 @@ It gets even better! Moto isn't just S3. Here's the status of the other AWS serv
|------------------------------------------------------------------------------| |------------------------------------------------------------------------------|
| RDS | @mock_rds | core endpoints done | | RDS | @mock_rds | core endpoints done |
|------------------------------------------------------------------------------| |------------------------------------------------------------------------------|
| RDS2 | @mock_rds2 | core endpoints done |
|------------------------------------------------------------------------------|
| Route53 | @mock_route53 | core endpoints done | | Route53 | @mock_route53 | core endpoints done |
|------------------------------------------------------------------------------| |------------------------------------------------------------------------------|
| S3 | @mock_s3 | core endpoints done | | S3 | @mock_s3 | core endpoints done |

View File

@ -13,6 +13,7 @@ from .emr import mock_emr # flake8: noqa
from .iam import mock_iam # flake8: noqa from .iam import mock_iam # flake8: noqa
from .kinesis import mock_kinesis # flake8: noqa from .kinesis import mock_kinesis # flake8: noqa
from .rds import mock_rds # flake8: noqa from .rds import mock_rds # flake8: noqa
from .rds2 import mock_rds2 # flake8: noqa
from .redshift import mock_redshift # flake8: noqa from .redshift import mock_redshift # flake8: noqa
from .s3 import mock_s3 # flake8: noqa from .s3 import mock_s3 # flake8: noqa
from .s3bucket_path import mock_s3bucket_path # flake8: noqa from .s3bucket_path import mock_s3bucket_path # flake8: noqa

View File

@ -199,6 +199,12 @@ class Database(object):
<PubliclyAccessible>{{ database.publicly_accessible }}</PubliclyAccessible> <PubliclyAccessible>{{ database.publicly_accessible }}</PubliclyAccessible>
<AutoMinorVersionUpgrade>{{ database.auto_minor_version_upgrade }}</AutoMinorVersionUpgrade> <AutoMinorVersionUpgrade>{{ database.auto_minor_version_upgrade }}</AutoMinorVersionUpgrade>
<AllocatedStorage>{{ database.allocated_storage }}</AllocatedStorage> <AllocatedStorage>{{ database.allocated_storage }}</AllocatedStorage>
{% if database.iops %}
<Iops>{{ database.iops }}</Iops>
<StorageType>io1</StorageType>
{% else %}
<StorageType>{{ database.storage_type }}</StorageType>
{% endif %}
<DBInstanceClass>{{ database.db_instance_class }}</DBInstanceClass> <DBInstanceClass>{{ database.db_instance_class }}</DBInstanceClass>
<MasterUsername>{{ database.master_username }}</MasterUsername> <MasterUsername>{{ database.master_username }}</MasterUsername>
<Endpoint> <Endpoint>

12
moto/rds2/__init__.py Normal file
View File

@ -0,0 +1,12 @@
from __future__ import unicode_literals
from .models import rds2_backends
from ..core.models import MockAWS
rds2_backend = rds2_backends['us-west-1']
def mock_rds2(func=None):
if func:
return MockAWS(rds2_backends)(func)
else:
return MockAWS(rds2_backends)

39
moto/rds2/exceptions.py Normal file
View File

@ -0,0 +1,39 @@
from __future__ import unicode_literals
import json
from werkzeug.exceptions import BadRequest
class RDSClientError(BadRequest):
def __init__(self, code, message):
super(RDSClientError, self).__init__()
self.description = json.dumps({
"Error": {
"Code": code,
"Message": message,
'Type': 'Sender',
},
'RequestId': '6876f774-7273-11e4-85dc-39e55ca848d1',
})
class DBInstanceNotFoundError(RDSClientError):
def __init__(self, database_identifier):
super(DBInstanceNotFoundError, self).__init__(
'DBInstanceNotFound',
"Database {0} not found.".format(database_identifier))
class DBSecurityGroupNotFoundError(RDSClientError):
def __init__(self, security_group_name):
super(DBSecurityGroupNotFoundError, self).__init__(
'DBSecurityGroupNotFound',
"Security Group {0} not found.".format(security_group_name))
class DBSubnetGroupNotFoundError(RDSClientError):
def __init__(self, subnet_group_name):
super(DBSubnetGroupNotFoundError, self).__init__(
'DBSubnetGroupNotFound',
"Subnet Group {0} not found.".format(subnet_group_name))

786
moto/rds2/models.py Normal file

File diff suppressed because one or more lines are too long

507
moto/rds2/responses.py Normal file
View File

@ -0,0 +1,507 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backends
from .models import rds2_backends
import json
import re
class RDS2Response(BaseResponse):
@property
def backend(self):
return rds2_backends[self.region]
def _get_db_kwargs(self):
args = {
"auto_minor_version_upgrade": self._get_param('AutoMinorVersionUpgrade'),
"allocated_storage": self._get_int_param('AllocatedStorage'),
"availability_zone": self._get_param("AvailabilityZone"),
"backup_retention_period": self._get_param("BackupRetentionPeriod"),
"db_instance_class": self._get_param('DBInstanceClass'),
"db_instance_identifier": self._get_param('DBInstanceIdentifier'),
"db_name": self._get_param("DBName"),
# DBParameterGroupName
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
"engine": self._get_param("Engine"),
"engine_version": self._get_param("EngineVersion"),
"iops": self._get_int_param("Iops"),
"master_password": self._get_param('MasterUserPassword'),
"master_username": self._get_param('MasterUsername'),
"multi_az": self._get_bool_param("MultiAZ"),
# OptionGroupName
"port": self._get_param('Port'),
# PreferredBackupWindow
# PreferredMaintenanceWindow
"publicly_accessible": self._get_param("PubliclyAccessible"),
"region": self.region,
"security_groups": self._get_multi_param('DBSecurityGroups.member'),
"storage_type": self._get_param("StorageType"),
# VpcSecurityGroupIds.member.N
"tags": list()
}
args['tags'] = self.unpack_complex_list_params('Tags.member', ('Key', 'Value'))
return args
def _get_db_replica_kwargs(self):
return {
"auto_minor_version_upgrade": self._get_param('AutoMinorVersionUpgrade'),
"availability_zone": self._get_param("AvailabilityZone"),
"db_instance_class": self._get_param('DBInstanceClass'),
"db_instance_identifier": self._get_param('DBInstanceIdentifier'),
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
"iops": self._get_int_param("Iops"),
# OptionGroupName
"port": self._get_param('Port'),
"publicly_accessible": self._get_param("PubliclyAccessible"),
"source_db_identifier": self._get_param('SourceDBInstanceIdentifier'),
"storage_type": self._get_param("StorageType"),
}
def _get_option_group_kwargs(self):
return {
'major_engine_version': self._get_param('MajorEngineVersion'),
'description': self._get_param('OptionGroupDescription'),
'engine_name': self._get_param('EngineName'),
'name': self._get_param('OptionGroupName')
}
def unpack_complex_list_params(self, label, names):
unpacked_list = list()
count = 1
while self._get_param('{0}.{1}.{2}'.format(label, count, names[0])):
param = dict()
for i in range(len(names)):
param[names[i]] = self._get_param('{0}.{1}.{2}'.format(label, count, names[i]))
unpacked_list.append(param)
count += 1
return unpacked_list
def unpack_list_params(self, label):
unpacked_list = list()
count = 1
while self._get_param('{0}.{1}'.format(label, count)):
unpacked_list.append(self._get_param('{0}.{1}'.format(label, count)))
count += 1
return unpacked_list
def create_dbinstance(self):
return self.create_db_instance()
def create_db_instance(self):
db_kwargs = self._get_db_kwargs()
database = self.backend.create_database(db_kwargs)
template = self.response_template(CREATE_DATABASE_TEMPLATE)
return template.render(database=database)
def create_dbinstance_read_replica(self):
return self.create_db_instance_read_replica()
def create_db_instance_read_replica(self):
db_kwargs = self._get_db_replica_kwargs()
database = self.backend.create_database_replica(db_kwargs)
template = self.response_template(CREATE_DATABASE_REPLICA_TEMPLATE)
return template.render(database=database)
def describe_dbinstances(self):
return self.describe_db_instances()
def describe_db_instances(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
databases = self.backend.describe_databases(db_instance_identifier)
template = self.response_template(DESCRIBE_DATABASES_TEMPLATE)
return template.render(databases=databases)
def modify_dbinstance(self):
return self.modify_db_instance()
def modify_db_instance(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
db_kwargs = self._get_db_kwargs()
database = self.backend.modify_database(db_instance_identifier, db_kwargs)
template = self.response_template(MODIFY_DATABASE_TEMPLATE)
return template.render(database=database)
def delete_dbinstance(self):
return self.delete_db_instance()
def delete_db_instance(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
database = self.backend.delete_database(db_instance_identifier)
template = self.response_template(DELETE_DATABASE_TEMPLATE)
return template.render(database=database)
def reboot_dbinstance(self):
return self.reboot_db_instance()
def reboot_db_instance(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
database = self.backend.reboot_db_instance(db_instance_identifier)
template = self.response_template(REBOOT_DATABASE_TEMPLATE)
return template.render(database=database)
def list_tags_for_resource(self):
arn = self._get_param('ResourceName')
template = self.response_template(LIST_TAGS_FOR_RESOURCE_TEMPLATE)
tags = self.backend.list_tags_for_resource(arn)
return template.render(tags=tags)
def add_tags_to_resource(self):
arn = self._get_param('ResourceName')
tags = self.unpack_complex_list_params('Tags.member', ('Key', 'Value'))
tags = self.backend.add_tags_to_resource(arn, tags)
template = self.response_template(ADD_TAGS_TO_RESOURCE_TEMPLATE)
return template.render(tags=tags)
def remove_tags_from_resource(self):
arn = self._get_param('ResourceName')
tag_keys = self.unpack_list_params('TagKeys.member')
self.backend.remove_tags_from_resource(arn, tag_keys)
template = self.response_template(REMOVE_TAGS_FROM_RESOURCE_TEMPLATE)
return template.render()
def create_dbsecurity_group(self):
return self.create_db_security_group()
def create_db_security_group(self):
group_name = self._get_param('DBSecurityGroupName')
description = self._get_param('DBSecurityGroupDescription')
security_group = self.backend.create_security_group(group_name, description)
template = self.response_template(CREATE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
def describe_dbsecurity_groups(self):
return self.describe_db_security_groups()
def describe_db_security_groups(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_groups = self.backend.describe_security_groups(security_group_name)
template = self.response_template(DESCRIBE_SECURITY_GROUPS_TEMPLATE)
return template.render(security_groups=security_groups)
def delete_dbsecurity_group(self):
return self.delete_db_security_group()
def delete_db_security_group(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_group = self.backend.delete_security_group(security_group_name)
template = self.response_template(DELETE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
def authorize_dbsecurity_group_ingress(self):
return self.authorize_db_security_group_ingress()
def authorize_db_security_group_ingress(self):
security_group_name = self._get_param('DBSecurityGroupName')
cidr_ip = self._get_param('CIDRIP')
security_group = self.backend.authorize_security_group(security_group_name, cidr_ip)
template = self.response_template(AUTHORIZE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
def create_dbsubnet_group(self):
return self.create_db_subnet_group()
def create_db_subnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
description = self._get_param('DBSubnetGroupDescription')
subnet_ids = self._get_multi_param('SubnetIds.member')
subnets = [ec2_backends[self.region].get_subnet(subnet_id) for subnet_id in subnet_ids]
subnet_group = self.backend.create_subnet_group(subnet_name, description, subnets)
template = self.response_template(CREATE_SUBNET_GROUP_TEMPLATE)
return template.render(subnet_group=subnet_group)
def describe_dbsubnet_groups(self):
return self.describe_db_subnet_groups()
def describe_db_subnet_groups(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_groups = self.backend.describe_subnet_groups(subnet_name)
template = self.response_template(DESCRIBE_SUBNET_GROUPS_TEMPLATE)
return template.render(subnet_groups=subnet_groups)
def delete_dbsubnet_group(self):
return self.delete_db_subnet_group()
def delete_db_subnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_group = self.backend.delete_subnet_group(subnet_name)
template = self.response_template(DELETE_SUBNET_GROUP_TEMPLATE)
return template.render(subnet_group=subnet_group)
def create_option_group(self):
kwargs = self._get_option_group_kwargs()
option_group = self.backend.create_option_group(kwargs)
template = self.response_template(CREATE_OPTION_GROUP_TEMPLATE)
return template.render(option_group=option_group)
def delete_option_group(self):
kwargs = self._get_option_group_kwargs()
option_group = self.backend.delete_option_group(kwargs['name'])
template = self.response_template(DELETE_OPTION_GROUP_TEMPLATE)
return template.render(option_group=option_group)
def describe_option_groups(self):
kwargs = self._get_option_group_kwargs()
kwargs['max_records'] = self._get_param('MaxRecords')
kwargs['marker'] = self._get_param('Marker')
option_groups = self.backend.describe_option_groups(kwargs)
template = self.response_template(DESCRIBE_OPTION_GROUP_TEMPLATE)
return template.render(option_groups=option_groups)
def describe_option_group_options(self):
engine_name = self._get_param('EngineName')
major_engine_version = self._get_param('MajorEngineVersion')
option_group_options = self.backend.describe_option_group_options(engine_name, major_engine_version)
return option_group_options
def modify_option_group(self):
option_group_name = self._get_param('OptionGroupName')
count = 1
options_to_include = []
while self._get_param('OptionsToInclude.member.{0}.OptionName'.format(count)):
options_to_include.append({
'Port': self._get_param('OptionsToInclude.member.{0}.Port'.format(count)),
'OptionName': self._get_param('OptionsToInclude.member.{0}.OptionName'.format(count)),
'DBSecurityGroupMemberships': self._get_param('OptionsToInclude.member.{0}.DBSecurityGroupMemberships'.format(count)),
'OptionSettings': self._get_param('OptionsToInclude.member.{0}.OptionSettings'.format(count)),
'VpcSecurityGroupMemberships': self._get_param('OptionsToInclude.member.{0}.VpcSecurityGroupMemberships'.format(count))
})
count += 1
count = 1
options_to_remove = []
while self._get_param('OptionsToRemove.member.{0}'.format(count)):
options_to_remove.append(self._get_param('OptionsToRemove.member.{0}'.format(count)))
count += 1
apply_immediately = self._get_param('ApplyImmediately')
option_group = self.backend.modify_option_group(option_group_name,
options_to_include,
options_to_remove,
apply_immediately)
template = self.response_template(MODIFY_OPTION_GROUP_TEMPLATE)
return template.render(option_group=option_group)
CREATE_DATABASE_TEMPLATE = """{
"CreateDBInstanceResponse": {
"CreateDBInstanceResult": {
"DBInstance": {{ database.to_json() }}
},
"ResponseMetadata": { "RequestId": "523e3218-afc7-11c3-90f5-f90431260ab4" }
}
}"""
CREATE_DATABASE_REPLICA_TEMPLATE = """{"CreateDBInstanceReadReplicaResponse": {
"ResponseMetadata": {
"RequestId": "5e60c46d-a844-11e4-bb68-17f36418e58f"
},
"CreateDBInstanceReadReplicaResult": {
"DBInstance": {{ database.to_json() }}
}
}}"""
DESCRIBE_DATABASES_TEMPLATE = """{
"DescribeDBInstancesResponse": {
"DescribeDBInstancesResult": {
"DBInstances": [
{%- for database in databases -%}
{%- if loop.index != 1 -%},{%- endif -%}
{{ database.to_json() }}
{%- endfor -%}
]
},
"ResponseMetadata": { "RequestId": "523e3218-afc7-11c3-90f5-f90431260ab4" }
}
}"""
MODIFY_DATABASE_TEMPLATE = """{"ModifyDBInstanceResponse": {
"ModifyDBInstanceResult": {
"DBInstance": {{ database.to_json() }},
"ResponseMetadata": {
"RequestId": "bb58476c-a1a8-11e4-99cf-55e92d4bbada"
}
}
}
}"""
REBOOT_DATABASE_TEMPLATE = """{"RebootDBInstanceResponse": {
"RebootDBInstanceResult": {
"DBInstance": {{ database.to_json() }},
"ResponseMetadata": {
"RequestId": "d55711cb-a1ab-11e4-99cf-55e92d4bbada"
}
}
}
}"""
DELETE_DATABASE_TEMPLATE = """{ "DeleteDBInstanceResponse": {
"DeleteDBInstanceResult": {
"DBInstance": {{ database.to_json() }}
},
"ResponseMetadata": {
"RequestId": "523e3218-afc7-11c3-90f5-f90431260ab4"
}
}
}"""
CREATE_SECURITY_GROUP_TEMPLATE = """{"CreateDBSecurityGroupResponse": {
"CreateDBSecurityGroupResult": {
"DBSecurityGroup":
{{ security_group.to_json() }},
"ResponseMetadata": {
"RequestId": "462165d0-a77a-11e4-a5fa-75b30c556f97"
}}
}
}"""
DESCRIBE_SECURITY_GROUPS_TEMPLATE = """{
"DescribeDBSecurityGroupsResponse": {
"ResponseMetadata": {
"RequestId": "5df2014e-a779-11e4-bdb0-594def064d0c"
},
"DescribeDBSecurityGroupsResult": {
"Marker": "null",
"DBSecurityGroups": [
{% for security_group in security_groups %}
{%- if loop.index != 1 -%},{%- endif -%}
{{ security_group.to_json() }}
{% endfor %}
]
}
}
}"""
DELETE_SECURITY_GROUP_TEMPLATE = """{"DeleteDBSecurityGroupResponse": {
"ResponseMetadata": {
"RequestId": "97e846bd-a77d-11e4-ac58-91351c0f3426"
}
}}"""
AUTHORIZE_SECURITY_GROUP_TEMPLATE = """{
"AuthorizeDBSecurityGroupIngressResponse": {
"AuthorizeDBSecurityGroupIngressResult": {
"DBSecurityGroup": {{ security_group.to_json() }}
},
"ResponseMetadata": {
"RequestId": "75d32fd5-a77e-11e4-8892-b10432f7a87d"
}
}
}"""
CREATE_SUBNET_GROUP_TEMPLATE = """{
"CreateDBSubnetGroupResponse": {
"CreateDBSubnetGroupResult":
{ {{ subnet_group.to_json() }} },
"ResponseMetadata": { "RequestId": "3a401b3f-bb9e-11d3-f4c6-37db295f7674" }
}
}"""
DESCRIBE_SUBNET_GROUPS_TEMPLATE = """{
"DescribeDBSubnetGroupsResponse": {
"DescribeDBSubnetGroupsResult": {
"DBSubnetGroups": [
{% for subnet_group in subnet_groups %}
{ {{ subnet_group.to_json() }} }{%- if not loop.last -%},{%- endif -%}
{% endfor %}
],
"Marker": null
},
"ResponseMetadata": { "RequestId": "b783db3b-b98c-11d3-fbc7-5c0aad74da7c" }
}
}"""
DELETE_SUBNET_GROUP_TEMPLATE = """{"DeleteDBSubnetGroupResponse": {"ResponseMetadata": {"RequestId": "13785dd5-a7fc-11e4-bb9c-7f371d0859b0"}}}"""
CREATE_OPTION_GROUP_TEMPLATE = """{
"CreateOptionGroupResponse": {
"CreateOptionGroupResult": {
"OptionGroup": {{ option_group.to_json() }}
},
"ResponseMetadata": {
"RequestId": "1e38dad4-9f50-11e4-87ea-a31c60ed2e36"
}
}
}"""
DELETE_OPTION_GROUP_TEMPLATE = \
"""{"DeleteOptionGroupResponse": {"ResponseMetadata": {"RequestId": "e2590367-9fa2-11e4-99cf-55e92d41c60e"}}}"""
DESCRIBE_OPTION_GROUP_TEMPLATE = \
"""{"DescribeOptionGroupsResponse": {
"DescribeOptionGroupsResult": {
"Marker": null,
"OptionGroupsList": [
{%- for option_group in option_groups -%}
{%- if loop.index != 1 -%},{%- endif -%}
{{ option_group.to_json() }}
{%- endfor -%}
]},
"ResponseMetadata": {"RequestId": "4caf445d-9fbc-11e4-87ea-a31c60ed2e36"}
}}"""
DESCRIBE_OPTION_GROUP_OPTIONS_TEMPLATE = \
"""{"DescribeOptionGroupOptionsResponse": {
"DescribeOptionGroupOptionsResult": {
"Marker": null,
"OptionGroupOptions": [
{%- for option_group_option in option_group_options -%}
{%- if loop.index != 1 -%},{%- endif -%}
{{ option_group_option.to_json() }}
{%- endfor -%}
]},
"ResponseMetadata": {"RequestId": "457f7bb8-9fbf-11e4-9084-5754f80d5144"}
}}"""
MODIFY_OPTION_GROUP_TEMPLATE = \
"""{"ModifyOptionGroupResponse": {
"ResponseMetadata": {
"RequestId": "ce9284a5-a0de-11e4-b984-a11a53e1f328"
},
"ModifyOptionGroupResult":
{{ option_group.to_json() }}
}
}"""
LIST_TAGS_FOR_RESOURCE_TEMPLATE = \
"""{"ListTagsForResourceResponse":
{"ListTagsForResourceResult":
{"TagList": [
{%- for tag in tags -%}
{%- if loop.index != 1 -%},{%- endif -%}
{
"Key": "{{ tag['Key'] }}",
"Value": "{{ tag['Value'] }}"
}
{%- endfor -%}
]},
"ResponseMetadata": {
"RequestId": "8c21ba39-a598-11e4-b688-194eaf8658fa"
}
}
}"""
ADD_TAGS_TO_RESOURCE_TEMPLATE = \
"""{"ListTagsForResourceResponse": {
"ListTagsForResourceResult": {
"TagList": [
{%- for tag in tags -%}
{%- if loop.index != 1 -%},{%- endif -%}
{
"Key": "{{ tag['Key'] }}",
"Value": "{{ tag['Value'] }}"
}
{%- endfor -%}
]},
"ResponseMetadata": {
"RequestId": "b194d9ca-a664-11e4-b688-194eaf8658fa"
}
}
}"""
REMOVE_TAGS_FROM_RESOURCE_TEMPLATE = \
"""{"RemoveTagsFromResourceResponse": {"ResponseMetadata": {"RequestId": "c6499a01-a664-11e4-8069-fb454b71a80e"}}}
"""

11
moto/rds2/urls.py Normal file
View File

@ -0,0 +1,11 @@
from __future__ import unicode_literals
from .responses import RDS2Response
url_bases = [
"https?://rds.(.+).amazonaws.com",
"https?://rds.amazonaws.com",
]
url_paths = {
'{0}/$': RDS2Response().dispatch,
}

View File

@ -5,7 +5,6 @@ from setuptools import setup, find_packages
install_requires = [ install_requires = [
"Jinja2", "Jinja2",
"boto", "boto",
"dicttoxml",
"flask", "flask",
"httpretty>=0.6.1", "httpretty>=0.6.1",
"requests", "requests",

View File

@ -257,3 +257,16 @@ def test_connecting_to_us_east_1():
database.master_username.should.equal("root") database.master_username.should.equal("root")
database.endpoint.should.equal(('db-master-1.aaaaaaaaaa.us-east-1.rds.amazonaws.com', 3306)) database.endpoint.should.equal(('db-master-1.aaaaaaaaaa.us-east-1.rds.amazonaws.com', 3306))
database.security_groups[0].name.should.equal('my_sg') database.security_groups[0].name.should.equal('my_sg')
@disable_on_py3()
@mock_rds
def test_create_database_with_iops():
conn = boto.rds.connect_to_region("us-west-2")
database = conn.create_dbinstance("db-master-1", 10, 'db.m1.small', 'root', 'hunter2', iops=6000)
database.status.should.equal('available')
database.iops.should.equal(6000)
# boto>2.36.0 may change the following property name to `storage_type`
database.StorageType.should.equal('io1')

View File

@ -0,0 +1,577 @@
from __future__ import unicode_literals
import boto.rds2
import boto.vpc
from boto.exception import BotoServerError
import sure # noqa
from moto import mock_ec2, mock_rds2
from tests.helpers import disable_on_py3
@disable_on_py3()
@mock_rds2
def test_create_database():
conn = boto.rds2.connect_to_region("us-west-2")
database = conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBInstanceStatus'].should.equal('available')
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBInstanceIdentifier'].should.equal("db-master-1")
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['AllocatedStorage'].should.equal('10')
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBInstanceClass'].should.equal("db.m1.small")
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['MasterUsername'].should.equal("root")
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBSecurityGroups'][0]['DBSecurityGroup']['DBSecurityGroupName'].should.equal('my_sg')
@disable_on_py3()
@mock_rds2
def test_get_databases():
conn = boto.rds2.connect_to_region("us-west-2")
instances = conn.describe_db_instances()
list(instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances']).should.have.length_of(0)
conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
conn.create_db_instance(db_instance_identifier='db-master-2',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
instances = conn.describe_db_instances()
list(instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances']).should.have.length_of(2)
instances = conn.describe_db_instances("db-master-1")
list(instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances']).should.have.length_of(1)
instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['DBInstanceIdentifier'].should.equal("db-master-1")
@disable_on_py3()
@mock_rds2
def test_describe_non_existant_database():
conn = boto.rds2.connect_to_region("us-west-2")
conn.describe_db_instances.when.called_with("not-a-db").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_modify_db_instance():
conn = boto.rds2.connect_to_region("us-west-2")
database = conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
instances = conn.describe_db_instances('db-master-1')
instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['AllocatedStorage'].should.equal('10')
conn.modify_db_instance(db_instance_identifier='db-master-1', allocated_storage=20, apply_immediately=True)
instances = conn.describe_db_instances('db-master-1')
instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['AllocatedStorage'].should.equal('20')
@disable_on_py3()
@mock_rds2
def test_modify_non_existant_database():
conn = boto.rds2.connect_to_region("us-west-2")
conn.modify_db_instance.when.called_with(db_instance_identifier='not-a-db',
allocated_storage=20,
apply_immediately=True).should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_reboot_db_instance():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
database = conn.reboot_db_instance('db-master-1')
database['RebootDBInstanceResponse']['RebootDBInstanceResult']['DBInstance']['DBInstanceIdentifier'].should.equal("db-master-1")
@disable_on_py3()
@mock_rds2
def test_reboot_non_existant_database():
conn = boto.rds2.connect_to_region("us-west-2")
conn.reboot_db_instance.when.called_with("not-a-db").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_delete_database():
conn = boto.rds2.connect_to_region("us-west-2")
instances = conn.describe_db_instances()
list(instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances']).should.have.length_of(0)
conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
instances = conn.describe_db_instances()
list(instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances']).should.have.length_of(1)
conn.delete_db_instance("db-master-1")
instances = conn.describe_db_instances()
list(instances['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances']).should.have.length_of(0)
@disable_on_py3()
@mock_rds2
def test_delete_non_existant_database():
conn = boto.rds2.connect_to_region("us-west-2")
conn.delete_db_instance.when.called_with("not-a-db").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_create_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
option_group = conn.create_option_group('test', 'mysql', '5.6', 'test option group')
option_group['CreateOptionGroupResponse']['CreateOptionGroupResult']['OptionGroup']['OptionGroupName'].should.equal('test')
option_group['CreateOptionGroupResponse']['CreateOptionGroupResult']['OptionGroup']['EngineName'].should.equal('mysql')
option_group['CreateOptionGroupResponse']['CreateOptionGroupResult']['OptionGroup']['OptionGroupDescription'].should.equal('test option group')
option_group['CreateOptionGroupResponse']['CreateOptionGroupResult']['OptionGroup']['MajorEngineVersion'].should.equal('5.6')
@disable_on_py3()
@mock_rds2
def test_create_option_group_bad_engine_name():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group.when.called_with('test', 'invalid_engine', '5.6', 'test invalid engine').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_create_option_group_bad_engine_major_version():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group.when.called_with('test', 'mysql', '6.6.6', 'test invalid engine version').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_create_option_group_empty_description():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group.when.called_with('test', 'mysql', '5.6', '').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_create_option_group_duplicate():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group('test', 'mysql', '5.6', 'test option group')
conn.create_option_group.when.called_with('test', 'mysql', '5.6', 'foo').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_describe_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group('test', 'mysql', '5.6', 'test option group')
option_groups = conn.describe_option_groups('test')
option_groups['DescribeOptionGroupsResponse']['DescribeOptionGroupsResult']['OptionGroupsList'][0]['OptionGroupName'].should.equal('test')
@disable_on_py3()
@mock_rds2
def test_describe_non_existant_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.describe_option_groups.when.called_with("not-a-option-group").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_delete_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group('test', 'mysql', '5.6', 'test option group')
option_groups = conn.describe_option_groups('test')
option_groups['DescribeOptionGroupsResponse']['DescribeOptionGroupsResult']['OptionGroupsList'][0]['OptionGroupName'].should.equal('test')
conn.delete_option_group('test')
conn.describe_option_groups.when.called_with('test').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_delete_non_existant_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.delete_option_group.when.called_with('non-existant').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_describe_option_group_options():
conn = boto.rds2.connect_to_region("us-west-2")
option_group_options = conn.describe_option_group_options('sqlserver-ee')
len(option_group_options['DescribeOptionGroupOptionsResponse']['DescribeOptionGroupOptionsResult']['OptionGroupOptions']).should.equal(4)
option_group_options = conn.describe_option_group_options('sqlserver-ee', '11.00')
len(option_group_options['DescribeOptionGroupOptionsResponse']['DescribeOptionGroupOptionsResult']['OptionGroupOptions']).should.equal(2)
option_group_options = conn.describe_option_group_options('mysql', '5.6')
len(option_group_options['DescribeOptionGroupOptionsResponse']['DescribeOptionGroupOptionsResult']['OptionGroupOptions']).should.equal(1)
conn.describe_option_group_options.when.called_with('non-existent').should.throw(BotoServerError)
conn.describe_option_group_options.when.called_with('mysql', 'non-existent').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_modify_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group('test', 'mysql', '5.6', 'test option group')
# TODO: create option and validate before deleting.
# if Someone can tell me how the hell to use this function
# to add options to an option_group, I can finish coding this.
result = conn.modify_option_group('test', [], ['MEMCACHED'], True)
result['ModifyOptionGroupResponse']['ModifyOptionGroupResult']['EngineName'].should.equal('mysql')
result['ModifyOptionGroupResponse']['ModifyOptionGroupResult']['Options'].should.equal([])
result['ModifyOptionGroupResponse']['ModifyOptionGroupResult']['OptionGroupName'].should.equal('test')
@disable_on_py3()
@mock_rds2
def test_modify_option_group_no_options():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group('test', 'mysql', '5.6', 'test option group')
conn.modify_option_group.when.called_with('test').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_modify_non_existant_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.modify_option_group.when.called_with('non-existant', [('OptionName', 'Port', 'DBSecurityGroupMemberships', 'VpcSecurityGroupMemberships', 'OptionSettings')]).should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_delete_non_existant_database():
conn = boto.rds2.connect_to_region("us-west-2")
conn.delete_db_instance.when.called_with("not-a-db").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_list_tags_invalid_arn():
conn = boto.rds2.connect_to_region("us-west-2")
conn.list_tags_for_resource.when.called_with('arn:aws:rds:bad-arn').should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_list_tags_db():
conn = boto.rds2.connect_to_region("us-west-2")
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:db:foo')
result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList'].should.equal([])
conn.create_db_instance(db_instance_identifier='db-with-tags',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"],
tags=[('foo', 'bar'), ('foo1', 'bar1')])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:db:db-with-tags')
result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList'].should.equal([{'Value': 'bar',
'Key': 'foo'},
{'Value': 'bar1',
'Key': 'foo1'}])
@disable_on_py3()
@mock_rds2
def test_add_tags_db():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_instance(db_instance_identifier='db-without-tags',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"],
tags=[('foo', 'bar'), ('foo1', 'bar1')])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:db:db-without-tags')
list(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.have.length_of(2)
conn.add_tags_to_resource('arn:aws:rds:us-west-2:1234567890:db:db-without-tags',
[('foo', 'fish'), ('foo2', 'bar2')])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:db:db-without-tags')
list(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.have.length_of(3)
@disable_on_py3()
@mock_rds2
def test_remove_tags_db():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_instance(db_instance_identifier='db-with-tags',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"],
tags=[('foo', 'bar'), ('foo1', 'bar1')])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:db:db-with-tags')
len(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.equal(2)
conn.remove_tags_from_resource('arn:aws:rds:us-west-2:1234567890:db:db-with-tags', ['foo'])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:db:db-with-tags')
len(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.equal(1)
@disable_on_py3()
@mock_rds2
def test_add_tags_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group('test', 'mysql', '5.6', 'test option group')
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:og:test')
list(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.have.length_of(0)
conn.add_tags_to_resource('arn:aws:rds:us-west-2:1234567890:og:test',
[('foo', 'fish'), ('foo2', 'bar2')])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:og:test')
list(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.have.length_of(2)
@disable_on_py3()
@mock_rds2
def test_remove_tags_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_option_group('test', 'mysql', '5.6', 'test option group')
conn.add_tags_to_resource('arn:aws:rds:us-west-2:1234567890:og:test',
[('foo', 'fish'), ('foo2', 'bar2')])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:og:test')
list(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.have.length_of(2)
conn.remove_tags_from_resource('arn:aws:rds:us-west-2:1234567890:og:test',
['foo'])
result = conn.list_tags_for_resource('arn:aws:rds:us-west-2:1234567890:og:test')
list(result['ListTagsForResourceResponse']['ListTagsForResourceResult']['TagList']).should.have.length_of(1)
@disable_on_py3()
@mock_rds2
def test_create_database_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
result = conn.create_db_security_group('db_sg', 'DB Security Group')
result['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['DBSecurityGroupName'].should.equal("db_sg")
result['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['DBSecurityGroupDescription'].should.equal("DB Security Group")
result['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['IPRanges'].should.equal([])
@disable_on_py3()
@mock_rds2
def test_get_security_groups():
conn = boto.rds2.connect_to_region("us-west-2")
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(0)
conn.create_db_security_group('db_sg1', 'DB Security Group')
conn.create_db_security_group('db_sg2', 'DB Security Group')
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(2)
result = conn.describe_db_security_groups("db_sg1")
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(1)
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['DBSecurityGroupName'].should.equal("db_sg1")
@disable_on_py3()
@mock_rds2
def test_get_non_existant_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.describe_db_security_groups.when.called_with("not-a-sg").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_delete_database_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_security_group('db_sg', 'DB Security Group')
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(1)
conn.delete_db_security_group("db_sg")
result = conn.describe_db_security_groups()
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'].should.have.length_of(0)
@disable_on_py3()
@mock_rds2
def test_delete_non_existant_security_group():
conn = boto.rds2.connect_to_region("us-west-2")
conn.delete_db_security_group.when.called_with("not-a-db").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_security_group_authorize():
conn = boto.rds2.connect_to_region("us-west-2")
security_group = conn.create_db_security_group('db_sg', 'DB Security Group')
security_group['CreateDBSecurityGroupResponse']['CreateDBSecurityGroupResult']['DBSecurityGroup']['IPRanges'].should.equal([])
conn.authorize_db_security_group_ingress(db_security_group_name='db_sg',
cidrip='10.3.2.45/32')
result = conn.describe_db_security_groups("db_sg")
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.have.length_of(1)
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.equal(['10.3.2.45/32'])
conn.authorize_db_security_group_ingress(db_security_group_name='db_sg',
cidrip='10.3.2.46/32')
result = conn.describe_db_security_groups("db_sg")
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.have.length_of(2)
result['DescribeDBSecurityGroupsResponse']['DescribeDBSecurityGroupsResult']['DBSecurityGroups'][0]['IPRanges'].should.equal(['10.3.2.45/32', '10.3.2.46/32'])
@disable_on_py3()
@mock_rds2
def test_add_security_group_to_database():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2')
result = conn.describe_db_instances()
result['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['DBSecurityGroups'].should.equal([])
conn.create_db_security_group('db_sg', 'DB Security Group')
conn.modify_db_instance(db_instance_identifier='db-master-1',
db_security_groups=['db_sg'])
result = conn.describe_db_instances()
result['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['DBSecurityGroups'][0]['DBSecurityGroup']['DBSecurityGroupName'].should.equal('db_sg')
@disable_on_py3()
@mock_ec2
@mock_rds2
def test_create_database_subnet_group():
vpc_conn = boto.vpc.connect_to_region("us-west-2")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet1 = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
subnet2 = vpc_conn.create_subnet(vpc.id, "10.2.0.0/24")
subnet_ids = [subnet1.id, subnet2.id]
conn = boto.rds2.connect_to_region("us-west-2")
result = conn.create_db_subnet_group("db_subnet", "my db subnet", subnet_ids)
result['CreateDBSubnetGroupResponse']['CreateDBSubnetGroupResult']['DBSubnetGroup']['DBSubnetGroupName'].should.equal("db_subnet")
result['CreateDBSubnetGroupResponse']['CreateDBSubnetGroupResult']['DBSubnetGroup']['DBSubnetGroupDescription'].should.equal("my db subnet")
subnets = result['CreateDBSubnetGroupResponse']['CreateDBSubnetGroupResult']['DBSubnetGroup']['Subnets']
subnet_group_ids = [subnets['Subnet'][0]['SubnetIdentifier'], subnets['Subnet'][1]['SubnetIdentifier']]
list(subnet_group_ids).should.equal(subnet_ids)
@disable_on_py3()
@mock_ec2
@mock_rds2
def test_create_database_in_subnet_group():
vpc_conn = boto.vpc.connect_to_region("us-west-2")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_subnet_group("db_subnet1", "my db subnet", [subnet.id])
conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_subnet_group_name='db_subnet1')
result = conn.describe_db_instances("db-master-1")
result['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['DBSubnetGroup']['DBSubnetGroupName'].should.equal("db_subnet1")
@disable_on_py3()
@mock_ec2
@mock_rds2
def test_describe_database_subnet_group():
vpc_conn = boto.vpc.connect_to_region("us-west-2")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_subnet_group("db_subnet1", "my db subnet", [subnet.id])
conn.create_db_subnet_group("db_subnet2", "my db subnet", [subnet.id])
resp = conn.describe_db_subnet_groups()
groups_resp = resp['DescribeDBSubnetGroupsResponse']
subnet_groups = groups_resp['DescribeDBSubnetGroupsResult']['DBSubnetGroups']
subnet_groups.should.have.length_of(2)
subnets = groups_resp['DescribeDBSubnetGroupsResult']['DBSubnetGroups'][0]['DBSubnetGroup']['Subnets']
subnets.should.have.length_of(1)
list(resp).should.have.length_of(1)
list(groups_resp).should.have.length_of(2)
list(conn.describe_db_subnet_groups("db_subnet1")).should.have.length_of(1)
conn.describe_db_subnet_groups.when.called_with("not-a-subnet").should.throw(BotoServerError)
@disable_on_py3()
@mock_ec2
@mock_rds2
def test_delete_database_subnet_group():
vpc_conn = boto.vpc.connect_to_region("us-west-2")
vpc = vpc_conn.create_vpc("10.0.0.0/16")
subnet = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
conn = boto.rds2.connect_to_region("us-west-2")
result = conn.describe_db_subnet_groups()
result['DescribeDBSubnetGroupsResponse']['DescribeDBSubnetGroupsResult']['DBSubnetGroups'].should.have.length_of(0)
conn.create_db_subnet_group("db_subnet1", "my db subnet", [subnet.id])
result = conn.describe_db_subnet_groups()
result['DescribeDBSubnetGroupsResponse']['DescribeDBSubnetGroupsResult']['DBSubnetGroups'].should.have.length_of(1)
conn.delete_db_subnet_group("db_subnet1")
result = conn.describe_db_subnet_groups()
result['DescribeDBSubnetGroupsResponse']['DescribeDBSubnetGroupsResult']['DBSubnetGroups'].should.have.length_of(0)
conn.delete_db_subnet_group.when.called_with("db_subnet1").should.throw(BotoServerError)
@disable_on_py3()
@mock_rds2
def test_create_database_replica():
conn = boto.rds2.connect_to_region("us-west-2")
conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
replica = conn.create_db_instance_read_replica("db-replica-1", "db-master-1", "db.m1.small")
replica['CreateDBInstanceReadReplicaResponse']['CreateDBInstanceReadReplicaResult']['DBInstance']['ReadReplicaSourceDBInstanceIdentifier'].should.equal('db-master-1')
replica['CreateDBInstanceReadReplicaResponse']['CreateDBInstanceReadReplicaResult']['DBInstance']['DBInstanceClass'].should.equal('db.m1.small')
replica['CreateDBInstanceReadReplicaResponse']['CreateDBInstanceReadReplicaResult']['DBInstance']['DBInstanceIdentifier'].should.equal('db-replica-1')
master = conn.describe_db_instances("db-master-1")
master['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['ReadReplicaDBInstanceIdentifiers'].should.equal(['db-replica-1'])
conn.delete_db_instance("db-replica-1")
master = conn.describe_db_instances("db-master-1")
master['DescribeDBInstancesResponse']['DescribeDBInstancesResult']['DBInstances'][0]['ReadReplicaDBInstanceIdentifiers'].should.equal([])

View File

@ -0,0 +1,20 @@
from __future__ import unicode_literals
import sure # noqa
import moto.server as server
from moto import mock_rds2
'''
Test the different server responses
'''
#@mock_rds2
#def test_list_databases():
# backend = server.create_backend_app("rds2")
# test_client = backend.test_client()
#
# res = test_client.get('/?Action=DescribeDBInstances')
#
# res.data.decode("utf-8").should.contain("<DescribeDBInstancesResult>")