building out new RDS2 Mock.

completed:
 * create_db_instance()
 * create_option_group()
This commit is contained in:
Mike Fuller 2015-01-19 07:38:10 +11:00
parent babb7450ec
commit 8c16517f10
8 changed files with 1105 additions and 0 deletions

View File

@ -13,6 +13,7 @@ from .emr import mock_emr # flake8: noqa
from .iam import mock_iam # flake8: noqa from .iam import mock_iam # flake8: noqa
from .kinesis import mock_kinesis # flake8: noqa from .kinesis import mock_kinesis # flake8: noqa
from .rds import mock_rds # flake8: noqa from .rds import mock_rds # flake8: noqa
from .rds2 import mock_rds2 # flake8: noqa
from .redshift import mock_redshift # flake8: noqa from .redshift import mock_redshift # flake8: noqa
from .s3 import mock_s3 # flake8: noqa from .s3 import mock_s3 # flake8: noqa
from .s3bucket_path import mock_s3bucket_path # flake8: noqa from .s3bucket_path import mock_s3bucket_path # flake8: noqa

12
moto/rds2/__init__.py Normal file
View File

@ -0,0 +1,12 @@
from __future__ import unicode_literals
from .models import rds2_backends
from ..core.models import MockAWS
rds2_backend = rds2_backends['us-west-1']
def mock_rds2(func=None):
if func:
return MockAWS(rds2_backends)(func)
else:
return MockAWS(rds2_backends)

38
moto/rds2/exceptions.py Normal file
View File

@ -0,0 +1,38 @@
from __future__ import unicode_literals
import json
from werkzeug.exceptions import BadRequest
class RDSClientError(BadRequest):
def __init__(self, code, message):
super(RDSClientError, self).__init__()
self.description = json.dumps({
"Error": {
"Code": code,
"Message": message,
'Type': 'Sender',
},
'RequestId': '6876f774-7273-11e4-85dc-39e55ca848d1',
})
class DBInstanceNotFoundError(RDSClientError):
def __init__(self, database_identifier):
super(DBInstanceNotFoundError, self).__init__(
'DBInstanceNotFound',
"Database {0} not found.".format(database_identifier))
class DBSecurityGroupNotFoundError(RDSClientError):
def __init__(self, security_group_name):
super(DBSecurityGroupNotFoundError, self).__init__(
'DBSecurityGroupNotFound',
"Security Group {0} not found.".format(security_group_name))
class DBSubnetGroupNotFoundError(RDSClientError):
def __init__(self, subnet_group_name):
super(DBSubnetGroupNotFoundError, self).__init__(
'DBSubnetGroupNotFound',
"Subnet Group {0} not found.".format(subnet_group_name))

476
moto/rds2/models.py Normal file
View File

@ -0,0 +1,476 @@
from __future__ import unicode_literals
import copy
import boto.rds2
from jinja2 import Template
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
from moto.core import BaseBackend
from moto.core.utils import get_random_hex
from moto.ec2.models import ec2_backends
from .exceptions import DBInstanceNotFoundError, DBSecurityGroupNotFoundError, DBSubnetGroupNotFoundError
class Database(object):
def __init__(self, **kwargs):
self.status = "available"
self.is_replica = False
self.replicas = []
self.region = kwargs.get('region')
self.engine = kwargs.get("engine")
self.engine_version = kwargs.get("engine_version", None)
self.default_engine_versions = {"MySQL": "5.6.21",
"mysql": "5.6.21",
"oracle-se1": "11.2.0.4.v3",
"oracle-se": "11.2.0.4.v3",
"oracle-ee": "11.2.0.4.v3",
"sqlserver-ee": "11.00.2100.60.v1",
"sqlserver-se": "11.00.2100.60.v1",
"sqlserver-ex": "11.00.2100.60.v1",
"sqlserver-web": "11.00.2100.60.v1",
"postgres": "9.3.3"
}
if not self.engine_version and self.engine in self.default_engine_versions:
self.engine_version = self.default_engine_versions[self.engine]
self.iops = kwargs.get("iops")
self.storage_type = kwargs.get("storage_type")
self.master_username = kwargs.get('master_username')
self.master_user_password = kwargs.get('master_user_password')
self.auto_minor_version_upgrade = kwargs.get('auto_minor_version_upgrade')
if self.auto_minor_version_upgrade is None:
self.auto_minor_version_upgrade = True
self.allocated_storage = kwargs.get('allocated_storage')
self.db_instance_identifier = kwargs.get('db_instance_identifier')
self.source_db_identifier = kwargs.get("source_db_identifier")
self.db_instance_class = kwargs.get('db_instance_class')
self.port = kwargs.get('port')
self.db_instance_identifier = kwargs.get('db_instance_identifier')
self.db_name = kwargs.get("db_name")
self.publicly_accessible = kwargs.get("publicly_accessible")
if self.publicly_accessible is None:
self.publicly_accessible = True
self.backup_retention_period = kwargs.get("backup_retention_period")
if self.backup_retention_period is None:
self.backup_retention_period = 1
self.availability_zone = kwargs.get("availability_zone")
self.multi_az = kwargs.get("multi_az")
self.db_subnet_group_name = kwargs.get("db_subnet_group_name")
if self.db_subnet_group_name:
self.db_subnet_group = rds2_backends[self.region].describe_subnet_groups(self.db_subnet_group_name)[0]
else:
self.db_subnet_group = []
self.db_security_groups = kwargs.get('security_groups', ['a'])
self.vpc_security_group_ids = kwargs.get('vpc_security_group_ids', [])
self.preferred_maintenance_window = kwargs.get('preferred_maintenance_window', 'wed:06:38-wed:07:08')
self.db_parameter_group_name = kwargs.get('db_parameter_group_name', None)
self.default_parameter_groups = {"MySQL": "default.mysql5.6",
"mysql": "default.mysql5.6",
"postgres": "default.postgres9.3"
}
if not self.db_parameter_group_name and self.engine in self.default_parameter_groups:
self.db_parameter_group_name = self.default_parameter_groups[self.engine]
self.preferred_backup_window = kwargs.get('preferred_backup_window', '13:14-13:44')
self.license_model = kwargs.get('license_model', 'general-public-license')
self.option_group_name = kwargs.get('option_group_name', None)
self.default_option_groups = {"MySQL": "default.mysql5.6",
"mysql": "default.mysql5.6",
"postgres": "default.postgres9.3"
}
if not self.option_group_name and self.engine in self.default_option_groups:
self.option_group_name = self.default_option_groups[self.engine]
self.character_set_name = kwargs.get('character_set_name', None)
self.tags = kwargs.get('tags', None)
@property
def address(self):
return "{0}.aaaaaaaaaa.{1}.rds.amazonaws.com".format(self.db_instance_identifier, self.region)
def add_replica(self, replica):
self.replicas.append(replica.db_instance_identifier)
def remove_replica(self, replica):
self.replicas.remove(replica.db_instance_identifier)
def set_as_replica(self):
self.is_replica = True
self.replicas = []
def update(self, db_kwargs):
for key, value in db_kwargs.items():
if value is not None:
setattr(self, key, value)
def get_cfn_attribute(self, attribute_name):
if attribute_name == 'Endpoint.Address':
return self.address
elif attribute_name == 'Endpoint.Port':
return self.port
raise UnformattedGetAttTemplateException()
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
db_instance_identifier = properties.get('DBInstanceIdentifier')
if not db_instance_identifier:
db_instance_identifier = resource_name.lower() + get_random_hex(12)
db_security_groups = properties.get('DBSecurityGroups')
if not db_security_groups:
db_security_groups = []
security_groups = [group.group_name for group in db_security_groups]
db_subnet_group = properties.get("DBSubnetGroupName")
db_subnet_group_name = db_subnet_group.subnet_name if db_subnet_group else None
db_kwargs = {
"auto_minor_version_upgrade": properties.get('AutoMinorVersionUpgrade'),
"allocated_storage": properties.get('AllocatedStorage'),
"availability_zone": properties.get("AvailabilityZone"),
"backup_retention_period": properties.get("BackupRetentionPeriod"),
"db_instance_class": properties.get('DBInstanceClass'),
"db_instance_identifier": db_instance_identifier,
"db_name": properties.get("DBName"),
"db_subnet_group_name": db_subnet_group_name,
"engine": properties.get("Engine"),
"engine_version": properties.get("EngineVersion"),
"iops": properties.get("Iops"),
"master_password": properties.get('MasterUserPassword'),
"master_username": properties.get('MasterUsername'),
"multi_az": properties.get("MultiAZ"),
"port": properties.get('Port', 3306),
"publicly_accessible": properties.get("PubliclyAccessible"),
"region": region_name,
"security_groups": security_groups,
"storage_type": properties.get("StorageType"),
}
rds2_backend = rds2_backends[region_name]
source_db_identifier = properties.get("SourceDBInstanceIdentifier")
if source_db_identifier:
# Replica
db_kwargs["source_db_identifier"] = source_db_identifier.db_instance_identifier
database = rds2_backend.create_database_replica(db_kwargs)
else:
database = rds2_backend.create_database(db_kwargs)
return database
def to_json(self):
template = Template(""""DBInstance": {
"AllocatedStorage": 10,
"AutoMinorVersionUpgrade": "{{ database.auto_minor_version_upgrade }}",
"AvailabilityZone": "{{ database.availability_zone }}",
"BackupRetentionPeriod": "{{ database.backup_retention_period }}",
"CharacterSetName": {%- if database.character_set_name -%}{{ database.character_set_name }}{%- else %} null{%- endif -%},
"DBInstanceClass": "{{ database.db_instance_class }}",
"DBInstanceIdentifier": "{{ database.db_instance_identifier }}",
"DBInstanceStatus": "{{ database.status }}",
"DBName": {%- if database.db_name -%}{{ database.db_name }}{%- else %} null{%- endif -%},
{% if database.db_parameter_group_name -%}"DBParameterGroups": {
"DBParameterGroup": {
"ParameterApplyStatus": "in-sync",
"DBParameterGroupName": "{{ database.db_parameter_group_name }}"
}
},{%- endif %}
"DBSecurityGroups": [{
{% for security_group in database.db_security_groups -%}{%- if loop.index != 1 -%},{%- endif -%}
"DBSecurityGroup": {
"Status": "active",
"DBSecurityGroupName": "{{ security_group }}"
}{% endfor %}
}],{%- if database.db_subnet_group -%}
"DBSubnetGroup": {
"DBSubnetGroupDescription": "nabil-db-subnet-group",
"DBSubnetGroupName": "nabil-db-subnet-group",
"SubnetGroupStatus": "Complete",
"Subnets": [
{
"SubnetAvailabilityZone": {
"Name": "us-west-2c",
"ProvisionedIopsCapable": false
},
"SubnetIdentifier": "subnet-c0ea0099",
"SubnetStatus": "Active"
},
{
"SubnetAvailabilityZone": {
"Name": "us-west-2a",
"ProvisionedIopsCapable": false
},
"SubnetIdentifier": "subnet-ff885d88",
"SubnetStatus": "Active"
}
],
"VpcId": "vpc-8e6ab6eb"
},{%- endif %}
"Engine": "{{ database.engine }}",
"EngineVersion": "{{ database.engine_version }}",
"LatestRestorableTime": null,
"LicenseModel": "{{ database.license_model }}",
"MasterUsername": "{{ database.master_username }}",
"MultiAZ": "{{ database.multi_az }}",{% if database.option_group_name %}
"OptionGroupMemberships": [{
"OptionGroupMembership": {
"OptionGroupName": "{{ database.option_group_name }}",
"Status": "in-sync"
}
}],{%- endif %}
"PendingModifiedValues": { "MasterUserPassword": "****" },
"PreferredBackupWindow": "{{ database.preferred_backup_window }}",
"PreferredMaintenanceWindow": "{{ database.preferred_maintenance_window }}",
"PubliclyAccessible": "{{ database.publicly_accessible }}",
"AllocatedStorage": "{{ database.allocated_storage }}",
"Endpoint": null,
"InstanceCreateTime": null,
"Iops": null,
"ReadReplicaDBInstanceIdentifiers": [],
"ReadReplicaSourceDBInstanceIdentifier": null,
"SecondaryAvailabilityZone": null,
"StatusInfos": null,
"VpcSecurityGroups": [
{
"Status": "active",
"VpcSecurityGroupId": "sg-123456"
}
]
}""")
return template.render(database=self)
class SecurityGroup(object):
def __init__(self, group_name, description):
self.group_name = group_name
self.description = description
self.status = "authorized"
self.ip_ranges = []
self.ec2_security_groups = []
def to_xml(self):
template = Template("""<DBSecurityGroup>
<EC2SecurityGroups>
{% for security_group in security_group.ec2_security_groups %}
<EC2SecurityGroup>
<EC2SecurityGroupId>{{ security_group.id }}</EC2SecurityGroupId>
<EC2SecurityGroupName>{{ security_group.name }}</EC2SecurityGroupName>
<EC2SecurityGroupOwnerId>{{ security_group.owner_id }}</EC2SecurityGroupOwnerId>
<Status>authorized</Status>
</EC2SecurityGroup>
{% endfor %}
</EC2SecurityGroups>
<DBSecurityGroupDescription>{{ security_group.description }}</DBSecurityGroupDescription>
<IPRanges>
{% for ip_range in security_group.ip_ranges %}
<IPRange>
<CIDRIP>{{ ip_range }}</CIDRIP>
<Status>authorized</Status>
</IPRange>
{% endfor %}
</IPRanges>
<OwnerId>{{ security_group.ownder_id }}</OwnerId>
<DBSecurityGroupName>{{ security_group.group_name }}</DBSecurityGroupName>
</DBSecurityGroup>""")
return template.render(security_group=self)
def authorize_cidr(self, cidr_ip):
self.ip_ranges.append(cidr_ip)
def authorize_security_group(self, security_group):
self.ec2_security_groups.append(security_group)
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
group_name = resource_name.lower() + get_random_hex(12)
description = properties['GroupDescription']
security_group_ingress = properties['DBSecurityGroupIngress']
ec2_backend = ec2_backends[region_name]
rds2_backend = rds2_backends[region_name]
security_group = rds2_backend.create_security_group(
group_name,
description,
)
for ingress_type, ingress_value in security_group_ingress.items():
if ingress_type == "CIDRIP":
security_group.authorize_cidr(ingress_value)
elif ingress_type == "EC2SecurityGroupName":
subnet = ec2_backend.get_security_group_from_name(ingress_value)
security_group.authorize_security_group(subnet)
elif ingress_type == "EC2SecurityGroupId":
subnet = ec2_backend.get_security_group_from_id(ingress_value)
security_group.authorize_security_group(subnet)
return security_group
class SubnetGroup(object):
def __init__(self, subnet_name, description, subnets):
self.subnet_name = subnet_name
self.description = description
self.subnets = subnets
self.status = "Complete"
self.vpc_id = self.subnets[0].vpc_id
def to_xml(self):
template = Template("""<DBSubnetGroup>
<VpcId>{{ subnet_group.vpc_id }}</VpcId>
<SubnetGroupStatus>{{ subnet_group.status }}</SubnetGroupStatus>
<DBSubnetGroupDescription>{{ subnet_group.description }}</DBSubnetGroupDescription>
<DBSubnetGroupName>{{ subnet_group.subnet_name }}</DBSubnetGroupName>
<Subnets>
{% for subnet in subnet_group.subnets %}
<Subnet>
<SubnetStatus>Active</SubnetStatus>
<SubnetIdentifier>{{ subnet.id }}</SubnetIdentifier>
<SubnetAvailabilityZone>
<Name>{{ subnet.availability_zone }}</Name>
<ProvisionedIopsCapable>false</ProvisionedIopsCapable>
</SubnetAvailabilityZone>
</Subnet>
{% endfor %}
</Subnets>
</DBSubnetGroup>""")
return template.render(subnet_group=self)
@classmethod
def create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):
properties = cloudformation_json['Properties']
subnet_name = resource_name.lower() + get_random_hex(12)
description = properties['DBSubnetGroupDescription']
subnet_ids = properties['SubnetIds']
ec2_backend = ec2_backends[region_name]
subnets = [ec2_backend.get_subnet(subnet_id) for subnet_id in subnet_ids]
rds2_backend = rds2_backends[region_name]
subnet_group = rds2_backend.create_subnet_group(
subnet_name,
description,
subnets,
)
return subnet_group
class RDS2Backend(BaseBackend):
def __init__(self):
self.databases = {}
self.security_groups = {}
self.subnet_groups = {}
self.option_groups = {}
def create_database(self, db_kwargs):
database_id = db_kwargs['db_instance_identifier']
database = Database(**db_kwargs)
self.databases[database_id] = database
return database
def create_database_replica(self, db_kwargs):
database_id = db_kwargs['db_instance_identifier']
source_database_id = db_kwargs['source_db_identifier']
primary = self.describe_databases(source_database_id)[0]
replica = copy.deepcopy(primary)
replica.update(db_kwargs)
replica.set_as_replica()
self.databases[database_id] = replica
primary.add_replica(replica)
return replica
def describe_databases(self, db_instance_identifier=None):
if db_instance_identifier:
if db_instance_identifier in self.databases:
return [self.databases[db_instance_identifier]]
else:
raise DBInstanceNotFoundError(db_instance_identifier)
return self.databases.values()
def modify_database(self, db_instance_identifier, db_kwargs):
database = self.describe_databases(db_instance_identifier)[0]
database.update(db_kwargs)
return database
def delete_database(self, db_instance_identifier):
if db_instance_identifier in self.databases:
database = self.databases.pop(db_instance_identifier)
if database.is_replica:
primary = self.describe_databases(database.source_db_identifier)[0]
primary.remove_replica(database)
return database
else:
raise DBInstanceNotFoundError(db_instance_identifier)
def create_security_group(self, group_name, description):
security_group = SecurityGroup(group_name, description)
self.security_groups[group_name] = security_group
return security_group
def describe_security_groups(self, security_group_name):
if security_group_name:
if security_group_name in self.security_groups:
return [self.security_groups[security_group_name]]
else:
raise DBSecurityGroupNotFoundError(security_group_name)
return self.security_groups.values()
def delete_security_group(self, security_group_name):
if security_group_name in self.security_groups:
return self.security_groups.pop(security_group_name)
else:
raise DBSecurityGroupNotFoundError(security_group_name)
def authorize_security_group(self, security_group_name, cidr_ip):
security_group = self.describe_security_groups(security_group_name)[0]
security_group.authorize_cidr(cidr_ip)
return security_group
def create_subnet_group(self, subnet_name, description, subnets):
subnet_group = SubnetGroup(subnet_name, description, subnets)
self.subnet_groups[subnet_name] = subnet_group
return subnet_group
def describe_subnet_groups(self, subnet_group_name):
if subnet_group_name:
if subnet_group_name in self.subnet_groups:
return [self.subnet_groups[subnet_group_name]]
else:
raise DBSubnetGroupNotFoundError(subnet_group_name)
return self.subnet_groups.values()
def delete_subnet_group(self, subnet_name):
if subnet_name in self.subnet_groups:
return self.subnet_groups.pop(subnet_name)
else:
raise DBSubnetGroupNotFoundError(subnet_name)
def create_option_group(self, option_group_kwargs):
option_group_id = option_group_kwargs['name']
option_group = OptionGroup(**option_group_kwargs)
self.option_groups[option_group_id] = option_group
return option_group
class OptionGroup(object):
def __init__(self, name, engine_name, major_engine_version, description):
self.engine_name = engine_name
self.major_engine_version = major_engine_version
self.description = description
self.name = name
self.vpc_and_non_vpc_instance_memberships = False
self.options = {}
self.vpcId = 'null'
def to_json(self):
template = Template("""{
"VpcId": null,
"MajorEngineVersion": "{{ option_group.engine_name }}",
"OptionGroupDescription": "{{ option_group.description }}",
"AllowsVpcAndNonVpcInstanceMemberships": "{{ option_group.vpc_and_non_vpc_instance_memberships }}",
"EngineName": "{{ option_group.engine_name }}",
"Options": [],
"OptionGroupName": "{{ option_group.name }}"
}""")
return template.render(option_group=self)
rds2_backends = {}
for region in boto.rds2.regions():
rds2_backends[region.name] = RDS2Backend()

289
moto/rds2/responses.py Normal file
View File

@ -0,0 +1,289 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backends
from .models import rds2_backends
class RDS2Response(BaseResponse):
@property
def backend(self):
return rds2_backends[self.region]
def _get_db_kwargs(self):
return {
"auto_minor_version_upgrade": self._get_param('AutoMinorVersionUpgrade'),
"allocated_storage": self._get_int_param('AllocatedStorage'),
"availability_zone": self._get_param("AvailabilityZone"),
"backup_retention_period": self._get_param("BackupRetentionPeriod"),
"db_instance_class": self._get_param('DBInstanceClass'),
"db_instance_identifier": self._get_param('DBInstanceIdentifier'),
"db_name": self._get_param("DBName"),
# DBParameterGroupName
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
"engine": self._get_param("Engine"),
"engine_version": self._get_param("EngineVersion"),
"iops": self._get_int_param("Iops"),
"master_password": self._get_param('MasterUserPassword'),
"master_username": self._get_param('MasterUsername'),
"multi_az": self._get_bool_param("MultiAZ"),
# OptionGroupName
"port": self._get_param('Port'),
# PreferredBackupWindow
# PreferredMaintenanceWindow
"publicly_accessible": self._get_param("PubliclyAccessible"),
"region": self.region,
"security_groups": self._get_multi_param('DBSecurityGroups.member'),
"storage_type": self._get_param("StorageType"),
# VpcSecurityGroupIds.member.N
}
def _get_db_replica_kwargs(self):
return {
"auto_minor_version_upgrade": self._get_param('AutoMinorVersionUpgrade'),
"availability_zone": self._get_param("AvailabilityZone"),
"db_instance_class": self._get_param('DBInstanceClass'),
"db_instance_identifier": self._get_param('DBInstanceIdentifier'),
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
"iops": self._get_int_param("Iops"),
# OptionGroupName
"port": self._get_param('Port'),
"publicly_accessible": self._get_param("PubliclyAccessible"),
"source_db_identifier": self._get_param('SourceDBInstanceIdentifier'),
"storage_type": self._get_param("StorageType"),
}
def _get_option_group_kwargs(self):
return {
'major_engine_version': self._get_param('MajorEngineVersion'),
'description': self._get_param('OptionGroupDescription'),
'engine_name': self._get_param('EngineName'),
'name': self._get_param('OptionGroupName')
}
def create_dbinstance(self):
return self.create_db_instance()
def create_db_instance(self):
db_kwargs = self._get_db_kwargs()
database = self.backend.create_database(db_kwargs)
template = self.response_template(CREATE_DATABASE_TEMPLATE)
result = template.render(database=database)
return result
# TODO: Update function to new method
def create_dbinstance_read_replica(self):
db_kwargs = self._get_db_replica_kwargs()
database = self.backend.create_database_replica(db_kwargs)
template = self.response_template(CREATE_DATABASE_REPLICA_TEMPLATE)
return template.render(database=database)
def describe_dbinstances(self):
return self.describe_db_instances()
def describe_dbinstances(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
databases = self.backend.describe_databases(db_instance_identifier)
template = self.response_template(DESCRIBE_DATABASES_TEMPLATE)
return template.render(databases=databases)
# TODO: Update function to new method
def modify_dbinstance(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
db_kwargs = self._get_db_kwargs()
database = self.backend.modify_database(db_instance_identifier, db_kwargs)
template = self.response_template(MODIFY_DATABASE_TEMPLATE)
return template.render(database=database)
# TODO: Update function to new method
def delete_dbinstance(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
database = self.backend.delete_database(db_instance_identifier)
template = self.response_template(DELETE_DATABASE_TEMPLATE)
return template.render(database=database)
# TODO: Update function to new method
def create_dbsecurity_group(self):
group_name = self._get_param('DBSecurityGroupName')
description = self._get_param('DBSecurityGroupDescription')
security_group = self.backend.create_security_group(group_name, description)
template = self.response_template(CREATE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
# TODO: Update function to new method
def describe_dbsecurity_groups(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_groups = self.backend.describe_security_groups(security_group_name)
template = self.response_template(DESCRIBE_SECURITY_GROUPS_TEMPLATE)
return template.render(security_groups=security_groups)
# TODO: Update function to new method
def delete_dbsecurity_group(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_group = self.backend.delete_security_group(security_group_name)
template = self.response_template(DELETE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
# TODO: Update function to new method
def authorize_dbsecurity_group_ingress(self):
security_group_name = self._get_param('DBSecurityGroupName')
cidr_ip = self._get_param('CIDRIP')
security_group = self.backend.authorize_security_group(security_group_name, cidr_ip)
template = self.response_template(AUTHORIZE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
# TODO: Update function to new method
def create_dbsubnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
description = self._get_param('DBSubnetGroupDescription')
subnet_ids = self._get_multi_param('SubnetIds.member')
subnets = [ec2_backends[self.region].get_subnet(subnet_id) for subnet_id in subnet_ids]
subnet_group = self.backend.create_subnet_group(subnet_name, description, subnets)
template = self.response_template(CREATE_SUBNET_GROUP_TEMPLATE)
return template.render(subnet_group=subnet_group)
# TODO: Update function to new method
def describe_dbsubnet_groups(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_groups = self.backend.describe_subnet_groups(subnet_name)
template = self.response_template(DESCRIBE_SUBNET_GROUPS_TEMPLATE)
return template.render(subnet_groups=subnet_groups)
# TODO: Update function to new method
def delete_dbsubnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_group = self.backend.delete_subnet_group(subnet_name)
template = self.response_template(DELETE_SUBNET_GROUP_TEMPLATE)
return template.render(subnet_group=subnet_group)
def create_option_group(self):
kwargs = self._get_option_group_kwargs()
option_group = self.backend.create_option_group(kwargs)
template = self.response_template(CREATE_OPTION_GROUP_TEMPLATE)
return template.render(option_group=option_group)
CREATE_DATABASE_TEMPLATE = """{
"CreateDBInstanceResponse": {
"CreateDBInstanceResult": {
{{ database.to_json() }}
},
"ResponseMetadata": { "RequestId": "523e3218-afc7-11c3-90f5-f90431260ab4" }
}
}"""
CREATE_DATABASE_REPLICA_TEMPLATE = """<CreateDBInstanceReadReplicaResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateDBInstanceReadReplicaResult>
{{ database.to_xml() }}
</CreateDBInstanceReadReplicaResult>
<ResponseMetadata>
<RequestId>ba8dedf0-bb9a-11d3-855b-576787000e19</RequestId>
</ResponseMetadata>
</CreateDBInstanceReadReplicaResponse>"""
DESCRIBE_DATABASES_TEMPLATE = """{
"DescribeDBInstanceResponse": {
"DescribeDBInstanceResult": [
{%- for database in databases -%}
{%- if loop.index != 1 -%},{%- endif -%}
{ {{ database.to_json() }} }
{%- endfor -%}
],
"ResponseMetadata": { "RequestId": "523e3218-afc7-11c3-90f5-f90431260ab4" }
}
}"""
MODIFY_DATABASE_TEMPLATE = """<ModifyDBInstanceResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<ModifyDBInstanceResult>
{{ database.to_xml() }}
</ModifyDBInstanceResult>
<ResponseMetadata>
<RequestId>f643f1ac-bbfe-11d3-f4c6-37db295f7674</RequestId>
</ResponseMetadata>
</ModifyDBInstanceResponse>"""
DELETE_DATABASE_TEMPLATE = """<DeleteDBInstanceResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<DeleteDBInstanceResult>
{{ database.to_xml() }}
</DeleteDBInstanceResult>
<ResponseMetadata>
<RequestId>7369556f-b70d-11c3-faca-6ba18376ea1b</RequestId>
</ResponseMetadata>
</DeleteDBInstanceResponse>"""
CREATE_SECURITY_GROUP_TEMPLATE = """<CreateDBSecurityGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateDBSecurityGroupResult>
{{ security_group.to_xml() }}
</CreateDBSecurityGroupResult>
<ResponseMetadata>
<RequestId>e68ef6fa-afc1-11c3-845a-476777009d19</RequestId>
</ResponseMetadata>
</CreateDBSecurityGroupResponse>"""
DESCRIBE_SECURITY_GROUPS_TEMPLATE = """<DescribeDBSecurityGroupsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<DescribeDBSecurityGroupsResult>
<DBSecurityGroups>
{% for security_group in security_groups %}
{{ security_group.to_xml() }}
{% endfor %}
</DBSecurityGroups>
</DescribeDBSecurityGroupsResult>
<ResponseMetadata>
<RequestId>b76e692c-b98c-11d3-a907-5a2c468b9cb0</RequestId>
</ResponseMetadata>
</DescribeDBSecurityGroupsResponse>"""
DELETE_SECURITY_GROUP_TEMPLATE = """<DeleteDBSecurityGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<ResponseMetadata>
<RequestId>7aec7454-ba25-11d3-855b-576787000e19</RequestId>
</ResponseMetadata>
</DeleteDBSecurityGroupResponse>"""
AUTHORIZE_SECURITY_GROUP_TEMPLATE = """<AuthorizeDBSecurityGroupIngressResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<AuthorizeDBSecurityGroupIngressResult>
{{ security_group.to_xml() }}
</AuthorizeDBSecurityGroupIngressResult>
<ResponseMetadata>
<RequestId>6176b5f8-bfed-11d3-f92b-31fa5e8dbc99</RequestId>
</ResponseMetadata>
</AuthorizeDBSecurityGroupIngressResponse>"""
CREATE_SUBNET_GROUP_TEMPLATE = """<CreateDBSubnetGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<CreateDBSubnetGroupResult>
{{ subnet_group.to_xml() }}
</CreateDBSubnetGroupResult>
<ResponseMetadata>
<RequestId>3a401b3f-bb9e-11d3-f4c6-37db295f7674</RequestId>
</ResponseMetadata>
</CreateDBSubnetGroupResponse>"""
DESCRIBE_SUBNET_GROUPS_TEMPLATE = """<DescribeDBSubnetGroupsResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<DescribeDBSubnetGroupsResult>
<DBSubnetGroups>
{% for subnet_group in subnet_groups %}
{{ subnet_group.to_xml() }}
{% endfor %}
</DBSubnetGroups>
</DescribeDBSubnetGroupsResult>
<ResponseMetadata>
<RequestId>b783db3b-b98c-11d3-fbc7-5c0aad74da7c</RequestId>
</ResponseMetadata>
</DescribeDBSubnetGroupsResponse>"""
DELETE_SUBNET_GROUP_TEMPLATE = """<DeleteDBSubnetGroupResponse xmlns="http://rds.amazonaws.com/doc/2014-09-01/">
<ResponseMetadata>
<RequestId>6295e5ab-bbf3-11d3-f4c6-37db295f7674</RequestId>
</ResponseMetadata>
</DeleteDBSubnetGroupResponse>"""
CREATE_OPTION_GROUP_TEMPLATE = """{
"CreateOptionGroupResponse": {
"CreateOptionGroupResult": {
"OptionGroup": {{ option_group.to_json() }}
},
"ResponseMetadata": {
"RequestId": "1e38dad4-9f50-11e4-87ea-a31c60ed2e36"
}
}
}"""

10
moto/rds2/urls.py Normal file
View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from .responses import RDS2Response
url_bases = [
"https?://rds.(.+).amazonaws.com",
]
url_paths = {
'{0}/$': RDS2Response().dispatch,
}

View File

@ -0,0 +1,259 @@
from __future__ import unicode_literals
import boto.rds2
import boto.vpc
from boto.exception import BotoServerError
import sure # noqa
from moto import mock_ec2, mock_rds2
from tests.helpers import disable_on_py3
@disable_on_py3()
@mock_rds2
def test_create_database():
conn = boto.rds2.connect_to_region("us-west-2")
database = conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBInstanceStatus'].should.equal('available')
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBInstanceIdentifier'].should.equal("db-master-1")
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['AllocatedStorage'].should.equal('10')
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBInstanceClass'].should.equal("db.m1.small")
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['MasterUsername'].should.equal("root")
database['CreateDBInstanceResponse']['CreateDBInstanceResult']['DBInstance']['DBSecurityGroups'][0]['DBSecurityGroup']['DBSecurityGroupName'].should.equal('my_sg')
@disable_on_py3()
@mock_rds2
def test_get_databases():
conn = boto.rds2.connect_to_region("us-west-2")
instances = conn.describe_db_instances()
list(instances['DescribeDBInstanceResponse']['DescribeDBInstanceResult']).should.have.length_of(0)
conn.create_db_instance(db_instance_identifier='db-master-1',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
conn.create_db_instance(db_instance_identifier='db-master-2',
allocated_storage=10,
engine='postgres',
db_instance_class='db.m1.small',
master_username='root',
master_user_password='hunter2',
db_security_groups=["my_sg"])
instances = conn.describe_db_instances()
list(instances['DescribeDBInstanceResponse']['DescribeDBInstanceResult']).should.have.length_of(2)
instances = conn.describe_db_instances("db-master-1")
list(instances['DescribeDBInstanceResponse']['DescribeDBInstanceResult']).should.have.length_of(1)
instances['DescribeDBInstanceResponse']['DescribeDBInstanceResult'][0]['DBInstance']['DBInstanceIdentifier'].should.equal("db-master-1")
@mock_rds2
def test_describe_non_existant_database():
conn = boto.rds2.connect_to_region("us-west-2")
conn.describe_db_instances.when.called_with("not-a-db").should.throw(BotoServerError)
@mock_rds2
def test_create_option_group():
conn = boto.rds2.connect_to_region("us-west-2")
print conn.create_option_group('test', 'postgres', '9.3', 'test')
#@disable_on_py3()
#@mock_rds2
#def test_delete_database():
# conn = boto.rds2.connect_to_region("us-west-2")
# list(conn.get_all_dbinstances()).should.have.length_of(0)
#
# conn.create_dbinstance("db-master-1", 10, 'db.m1.small', 'root', 'hunter2')
# list(conn.get_all_dbinstances()).should.have.length_of(1)
#
# conn.delete_dbinstance("db-master-1")
# list(conn.get_all_dbinstances()).should.have.length_of(0)
#
#
#@mock_rds2
#def test_delete_non_existant_database():
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.delete_dbinstance.when.called_with("not-a-db").should.throw(BotoServerError)
#@mock_rds2
#def test_create_database_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
#
# security_group = conn.create_dbsecurity_group('db_sg', 'DB Security Group')
# security_group.name.should.equal('db_sg')
# security_group.description.should.equal("DB Security Group")
# list(security_group.ip_ranges).should.equal([])
#
#
#@mock_rds2
#def test_get_security_groups():
# conn = boto.rds2.connect_to_region("us-west-2")
#
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(0)
#
# conn.create_dbsecurity_group('db_sg1', 'DB Security Group')
# conn.create_dbsecurity_group('db_sg2', 'DB Security Group')
#
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(2)
#
# databases = conn.get_all_dbsecurity_groups("db_sg1")
# list(databases).should.have.length_of(1)
#
# databases[0].name.should.equal("db_sg1")
#
#
#@mock_rds2
#def test_get_non_existant_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.get_all_dbsecurity_groups.when.called_with("not-a-sg").should.throw(BotoServerError)
#
#
#@mock_rds2
#def test_delete_database_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.create_dbsecurity_group('db_sg', 'DB Security Group')
#
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(1)
#
# conn.delete_dbsecurity_group("db_sg")
# list(conn.get_all_dbsecurity_groups()).should.have.length_of(0)
#
#
#@mock_rds2
#def test_delete_non_existant_security_group():
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.delete_dbsecurity_group.when.called_with("not-a-db").should.throw(BotoServerError)
#
#
#@disable_on_py3()
#@mock_rds2
#def test_security_group_authorize():
# conn = boto.rds2.connect_to_region("us-west-2")
# security_group = conn.create_dbsecurity_group('db_sg', 'DB Security Group')
# list(security_group.ip_ranges).should.equal([])
#
# security_group.authorize(cidr_ip='10.3.2.45/32')
# security_group = conn.get_all_dbsecurity_groups()[0]
# list(security_group.ip_ranges).should.have.length_of(1)
# security_group.ip_ranges[0].cidr_ip.should.equal('10.3.2.45/32')
#
#
#@disable_on_py3()
#@mock_rds2
#def test_add_security_group_to_database():
# conn = boto.rds2.connect_to_region("us-west-2")
#
# database = conn.create_dbinstance("db-master-1", 10, 'db.m1.small', 'root', 'hunter2')
# security_group = conn.create_dbsecurity_group('db_sg', 'DB Security Group')
# database.modify(security_groups=[security_group])
#
# database = conn.get_all_dbinstances()[0]
# list(database.security_groups).should.have.length_of(1)
#
# database.security_groups[0].name.should.equal("db_sg")
#
#
#@mock_ec2
#@mock_rds2
#def test_add_database_subnet_group():
# vpc_conn = boto.vpc.connect_to_region("us-west-2")
# vpc = vpc_conn.create_vpc("10.0.0.0/16")
# subnet1 = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
# subnet2 = vpc_conn.create_subnet(vpc.id, "10.2.0.0/24")
#
# subnet_ids = [subnet1.id, subnet2.id]
# conn = boto.rds2.connect_to_region("us-west-2")
# subnet_group = conn.create_db_subnet_group("db_subnet", "my db subnet", subnet_ids)
# subnet_group.name.should.equal('db_subnet')
# subnet_group.description.should.equal("my db subnet")
# list(subnet_group.subnet_ids).should.equal(subnet_ids)
#
#
#@mock_ec2
#@mock_rds2
#def test_describe_database_subnet_group():
# vpc_conn = boto.vpc.connect_to_region("us-west-2")
# vpc = vpc_conn.create_vpc("10.0.0.0/16")
# subnet = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
#
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.create_db_subnet_group("db_subnet1", "my db subnet", [subnet.id])
# conn.create_db_subnet_group("db_subnet2", "my db subnet", [subnet.id])
#
# list(conn.get_all_db_subnet_groups()).should.have.length_of(2)
# list(conn.get_all_db_subnet_groups("db_subnet1")).should.have.length_of(1)
#
# conn.get_all_db_subnet_groups.when.called_with("not-a-subnet").should.throw(BotoServerError)
#
#
#@mock_ec2
#@mock_rds2
#def test_delete_database_subnet_group():
# vpc_conn = boto.vpc.connect_to_region("us-west-2")
# vpc = vpc_conn.create_vpc("10.0.0.0/16")
# subnet = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
#
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.create_db_subnet_group("db_subnet1", "my db subnet", [subnet.id])
# list(conn.get_all_db_subnet_groups()).should.have.length_of(1)
#
# conn.delete_db_subnet_group("db_subnet1")
# list(conn.get_all_db_subnet_groups()).should.have.length_of(0)
#
# conn.delete_db_subnet_group.when.called_with("db_subnet1").should.throw(BotoServerError)
#
#
#@disable_on_py3()
#@mock_ec2
#@mock_rds2
#def test_create_database_in_subnet_group():
# vpc_conn = boto.vpc.connect_to_region("us-west-2")
# vpc = vpc_conn.create_vpc("10.0.0.0/16")
# subnet = vpc_conn.create_subnet(vpc.id, "10.1.0.0/24")
#
# conn = boto.rds2.connect_to_region("us-west-2")
# conn.create_db_subnet_group("db_subnet1", "my db subnet", [subnet.id])
#
# database = conn.create_dbinstance("db-master-1", 10, 'db.m1.small',
# 'root', 'hunter2', db_subnet_group_name="db_subnet1")
#
# database = conn.get_all_dbinstances("db-master-1")[0]
# database.subnet_group.name.should.equal("db_subnet1")
#
#
#@disable_on_py3()
#@mock_rds2
#def test_create_database_replica():
# conn = boto.rds2.connect_to_region("us-west-2")
#
# primary = conn.create_dbinstance("db-master-1", 10, 'db.m1.small', 'root', 'hunter2')
#
# replica = conn.create_dbinstance_read_replica("replica", "db-master-1", "db.m1.small")
# replica.id.should.equal("replica")
# replica.instance_class.should.equal("db.m1.small")
# status_info = replica.status_infos[0]
# status_info.normal.should.equal(True)
# status_info.status_type.should.equal('read replication')
# status_info.status.should.equal('replicating')
#
# primary = conn.get_all_dbinstances("db-master-1")[0]
# primary.read_replica_dbinstance_identifiers[0].should.equal("replica")
#
# conn.delete_dbinstance("replica")
#
# primary = conn.get_all_dbinstances("db-master-1")[0]
# list(primary.read_replica_dbinstance_identifiers).should.have.length_of(0)

View File

@ -0,0 +1,20 @@
from __future__ import unicode_literals
import sure # noqa
import moto.server as server
from moto import mock_rds2
'''
Test the different server responses
'''
#@mock_rds2
#def test_list_databases():
# backend = server.create_backend_app("rds2")
# test_client = backend.test_client()
#
# res = test_client.get('/?Action=DescribeDBInstances')
#
# res.data.decode("utf-8").should.contain("<DescribeDBInstancesResult>")