from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from moto.ec2.models import ec2_backends
from .models import rds_backends
class RDSResponse(BaseResponse):
@property
def backend(self):
return rds_backends[self.region]
def _get_db_kwargs(self):
return {
"auto_minor_version_upgrade": self._get_param('AutoMinorVersionUpgrade'),
"allocated_storage": self._get_int_param('AllocatedStorage'),
"availability_zone": self._get_param("AvailabilityZone"),
"backup_retention_period": self._get_param("BackupRetentionPeriod"),
"db_instance_class": self._get_param('DBInstanceClass'),
"db_instance_identifier": self._get_param('DBInstanceIdentifier'),
"db_name": self._get_param("DBName"),
# DBParameterGroupName
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
"engine": self._get_param("Engine"),
"engine_version": self._get_param("EngineVersion"),
"iops": self._get_int_param("Iops"),
"master_password": self._get_param('MasterUserPassword'),
"master_username": self._get_param('MasterUsername'),
"multi_az": self._get_bool_param("MultiAZ"),
# OptionGroupName
"port": self._get_param('Port'),
# PreferredBackupWindow
# PreferredMaintenanceWindow
"publicly_accessible": self._get_param("PubliclyAccessible"),
"region": self.region,
"security_groups": self._get_multi_param('DBSecurityGroups.member'),
"storage_type": self._get_param("StorageType"),
# VpcSecurityGroupIds.member.N
}
def _get_db_replica_kwargs(self):
return {
"auto_minor_version_upgrade": self._get_param('AutoMinorVersionUpgrade'),
"availability_zone": self._get_param("AvailabilityZone"),
"db_instance_class": self._get_param('DBInstanceClass'),
"db_instance_identifier": self._get_param('DBInstanceIdentifier'),
"db_subnet_group_name": self._get_param("DBSubnetGroupName"),
"iops": self._get_int_param("Iops"),
# OptionGroupName
"port": self._get_param('Port'),
"publicly_accessible": self._get_param("PubliclyAccessible"),
"source_db_identifier": self._get_param('SourceDBInstanceIdentifier'),
"storage_type": self._get_param("StorageType"),
}
def create_dbinstance(self):
db_kwargs = self._get_db_kwargs()
database = self.backend.create_database(db_kwargs)
template = self.response_template(CREATE_DATABASE_TEMPLATE)
return template.render(database=database)
def create_dbinstance_read_replica(self):
db_kwargs = self._get_db_replica_kwargs()
database = self.backend.create_database_replica(db_kwargs)
template = self.response_template(CREATE_DATABASE_REPLICA_TEMPLATE)
return template.render(database=database)
def describe_dbinstances(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
databases = self.backend.describe_databases(db_instance_identifier)
template = self.response_template(DESCRIBE_DATABASES_TEMPLATE)
return template.render(databases=databases)
def modify_dbinstance(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
db_kwargs = self._get_db_kwargs()
database = self.backend.modify_database(db_instance_identifier, db_kwargs)
template = self.response_template(MODIFY_DATABASE_TEMPLATE)
return template.render(database=database)
def delete_dbinstance(self):
db_instance_identifier = self._get_param('DBInstanceIdentifier')
database = self.backend.delete_database(db_instance_identifier)
template = self.response_template(DELETE_DATABASE_TEMPLATE)
return template.render(database=database)
def create_dbsecurity_group(self):
group_name = self._get_param('DBSecurityGroupName')
description = self._get_param('DBSecurityGroupDescription')
security_group = self.backend.create_security_group(group_name, description)
template = self.response_template(CREATE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
def describe_dbsecurity_groups(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_groups = self.backend.describe_security_groups(security_group_name)
template = self.response_template(DESCRIBE_SECURITY_GROUPS_TEMPLATE)
return template.render(security_groups=security_groups)
def delete_dbsecurity_group(self):
security_group_name = self._get_param('DBSecurityGroupName')
security_group = self.backend.delete_security_group(security_group_name)
template = self.response_template(DELETE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
def authorize_dbsecurity_group_ingress(self):
security_group_name = self._get_param('DBSecurityGroupName')
cidr_ip = self._get_param('CIDRIP')
security_group = self.backend.authorize_security_group(security_group_name, cidr_ip)
template = self.response_template(AUTHORIZE_SECURITY_GROUP_TEMPLATE)
return template.render(security_group=security_group)
def create_dbsubnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
description = self._get_param('DBSubnetGroupDescription')
subnet_ids = self._get_multi_param('SubnetIds.member')
subnets = [ec2_backends[self.region].get_subnet(subnet_id) for subnet_id in subnet_ids]
subnet_group = self.backend.create_subnet_group(subnet_name, description, subnets)
template = self.response_template(CREATE_SUBNET_GROUP_TEMPLATE)
return template.render(subnet_group=subnet_group)
def describe_dbsubnet_groups(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_groups = self.backend.describe_subnet_groups(subnet_name)
template = self.response_template(DESCRIBE_SUBNET_GROUPS_TEMPLATE)
return template.render(subnet_groups=subnet_groups)
def delete_dbsubnet_group(self):
subnet_name = self._get_param('DBSubnetGroupName')
subnet_group = self.backend.delete_subnet_group(subnet_name)
template = self.response_template(DELETE_SUBNET_GROUP_TEMPLATE)
return template.render(subnet_group=subnet_group)
CREATE_DATABASE_TEMPLATE = """
{{ database.to_xml() }}
523e3218-afc7-11c3-90f5-f90431260ab4
"""
CREATE_DATABASE_REPLICA_TEMPLATE = """
{{ database.to_xml() }}
ba8dedf0-bb9a-11d3-855b-576787000e19
"""
DESCRIBE_DATABASES_TEMPLATE = """
{% for database in databases %}
{{ database.to_xml() }}
{% endfor %}
01b2685a-b978-11d3-f272-7cd6cce12cc5
"""
MODIFY_DATABASE_TEMPLATE = """
{{ database.to_xml() }}
f643f1ac-bbfe-11d3-f4c6-37db295f7674
"""
DELETE_DATABASE_TEMPLATE = """
{{ database.to_xml() }}
7369556f-b70d-11c3-faca-6ba18376ea1b
"""
CREATE_SECURITY_GROUP_TEMPLATE = """
{{ security_group.to_xml() }}
e68ef6fa-afc1-11c3-845a-476777009d19
"""
DESCRIBE_SECURITY_GROUPS_TEMPLATE = """
{% for security_group in security_groups %}
{{ security_group.to_xml() }}
{% endfor %}
b76e692c-b98c-11d3-a907-5a2c468b9cb0
"""
DELETE_SECURITY_GROUP_TEMPLATE = """
7aec7454-ba25-11d3-855b-576787000e19
"""
AUTHORIZE_SECURITY_GROUP_TEMPLATE = """
{{ security_group.to_xml() }}
6176b5f8-bfed-11d3-f92b-31fa5e8dbc99
"""
CREATE_SUBNET_GROUP_TEMPLATE = """
{{ subnet_group.to_xml() }}
3a401b3f-bb9e-11d3-f4c6-37db295f7674
"""
DESCRIBE_SUBNET_GROUPS_TEMPLATE = """
{% for subnet_group in subnet_groups %}
{{ subnet_group.to_xml() }}
{% endfor %}
b783db3b-b98c-11d3-fbc7-5c0aad74da7c
"""
DELETE_SUBNET_GROUP_TEMPLATE = """
6295e5ab-bbf3-11d3-f4c6-37db295f7674
"""