moto/tests/test_rds2/test_rds2.py
Brian Pandola ae2865d559
Minor RDS Clean up (#3682)
* Fix `DBInstanceNotFound` error message

Changed from `Database` to `DBInstance`, which is actually what comes back from AWS.

* Remove duplicate test

The removed test actually fails if run in isolation because `rds2` is not a
valid boto3 client service.  The reason this test never caused CI to fail is
because it is redefined later in the test suite, effectively making it dead
code that will never run.

Duplicate test has been removed and the remaining test has been improved
with more explicit asserts.
2021-02-13 11:12:02 +00:00

1769 lines
60 KiB
Python

from __future__ import unicode_literals
from botocore.exceptions import ClientError
import boto3
import pytest
import sure # noqa
from moto import mock_ec2, mock_kms, mock_rds2
from moto.core import ACCOUNT_ID
@mock_rds2
def test_create_database():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
LicenseModel="license-included",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
VpcSecurityGroupIds=["sg-123456"],
)
db_instance = database["DBInstance"]
db_instance["AllocatedStorage"].should.equal(10)
db_instance["DBInstanceClass"].should.equal("db.m1.small")
db_instance["LicenseModel"].should.equal("license-included")
db_instance["MasterUsername"].should.equal("root")
db_instance["DBSecurityGroups"][0]["DBSecurityGroupName"].should.equal("my_sg")
db_instance["DBInstanceArn"].should.equal(
"arn:aws:rds:us-west-2:1234567890:db:db-master-1"
)
db_instance["DBInstanceStatus"].should.equal("available")
db_instance["DBName"].should.equal("staging-postgres")
db_instance["DBInstanceIdentifier"].should.equal("db-master-1")
db_instance["IAMDatabaseAuthenticationEnabled"].should.equal(False)
db_instance["DbiResourceId"].should.contain("db-")
db_instance["CopyTagsToSnapshot"].should.equal(False)
db_instance["InstanceCreateTime"].should.be.a("datetime.datetime")
db_instance["VpcSecurityGroups"][0]["VpcSecurityGroupId"].should.equal("sg-123456")
@mock_rds2
def test_create_database_no_allocated_storage():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
)
db_instance = database["DBInstance"]
db_instance["Engine"].should.equal("postgres")
db_instance["StorageType"].should.equal("gp2")
db_instance["AllocatedStorage"].should.equal(20)
@mock_rds2
def test_create_database_non_existing_option_group():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance.when.called_with(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
OptionGroupName="non-existing",
).should.throw(ClientError)
@mock_rds2
def test_create_database_with_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="my-og",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
OptionGroupName="my-og",
)
db_instance = database["DBInstance"]
db_instance["AllocatedStorage"].should.equal(10)
db_instance["DBInstanceClass"].should.equal("db.m1.small")
db_instance["DBName"].should.equal("staging-postgres")
db_instance["OptionGroupMemberships"][0]["OptionGroupName"].should.equal("my-og")
@mock_rds2
def test_stop_database():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
LicenseModel="license-included",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
mydb = conn.describe_db_instances(
DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"]
)["DBInstances"][0]
mydb["DBInstanceStatus"].should.equal("available")
# test stopping database should shutdown
response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["DBInstance"]["DBInstanceStatus"].should.equal("stopped")
# test rdsclient error when trying to stop an already stopped database
conn.stop_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"]
).should.throw(ClientError)
# test stopping a stopped database with snapshot should error and no snapshot should exist for that call
conn.stop_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"],
DBSnapshotIdentifier="rocky4570-rds-snap",
).should.throw(ClientError)
response = conn.describe_db_snapshots()
response["DBSnapshots"].should.equal([])
@mock_rds2
def test_start_database():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
LicenseModel="license-included",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
mydb = conn.describe_db_instances(
DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"]
)["DBInstances"][0]
mydb["DBInstanceStatus"].should.equal("available")
# test starting an already started database should error
conn.start_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"]
).should.throw(ClientError)
# stop and test start - should go from stopped to available, create snapshot and check snapshot
response = conn.stop_db_instance(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"],
DBSnapshotIdentifier="rocky4570-rds-snap",
)
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["DBInstance"]["DBInstanceStatus"].should.equal("stopped")
response = conn.describe_db_snapshots()
response["DBSnapshots"][0]["DBSnapshotIdentifier"].should.equal(
"rocky4570-rds-snap"
)
response = conn.start_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["DBInstance"]["DBInstanceStatus"].should.equal("available")
# starting database should not remove snapshot
response = conn.describe_db_snapshots()
response["DBSnapshots"][0]["DBSnapshotIdentifier"].should.equal(
"rocky4570-rds-snap"
)
# test stopping database, create snapshot with existing snapshot already created should throw error
conn.stop_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"],
DBSnapshotIdentifier="rocky4570-rds-snap",
).should.throw(ClientError)
# test stopping database not invoking snapshot should succeed.
response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["DBInstance"]["DBInstanceStatus"].should.equal("stopped")
@mock_rds2
def test_fail_to_stop_multi_az_and_sqlserver():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="sqlserver-ee",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
LicenseModel="license-included",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
MultiAZ=True,
)
mydb = conn.describe_db_instances(
DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"]
)["DBInstances"][0]
mydb["DBInstanceStatus"].should.equal("available")
# multi-az databases arent allowed to be shutdown at this time.
conn.stop_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"]
).should.throw(ClientError)
# multi-az databases arent allowed to be started up at this time.
conn.start_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"]
).should.throw(ClientError)
@mock_rds2
def test_stop_multi_az_postgres():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
LicenseModel="license-included",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
MultiAZ=True,
)
mydb = conn.describe_db_instances(
DBInstanceIdentifier=database["DBInstance"]["DBInstanceIdentifier"]
)["DBInstances"][0]
mydb["DBInstanceStatus"].should.equal("available")
response = conn.stop_db_instance(DBInstanceIdentifier=mydb["DBInstanceIdentifier"])
response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200)
response["DBInstance"]["DBInstanceStatus"].should.equal("stopped")
@mock_rds2
def test_fail_to_stop_readreplica():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
LicenseModel="license-included",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
replica = conn.create_db_instance_read_replica(
DBInstanceIdentifier="db-replica-1",
SourceDBInstanceIdentifier="db-master-1",
DBInstanceClass="db.m1.small",
)
mydb = conn.describe_db_instances(
DBInstanceIdentifier=replica["DBInstance"]["DBInstanceIdentifier"]
)["DBInstances"][0]
mydb["DBInstanceStatus"].should.equal("available")
# read-replicas are not allowed to be stopped at this time.
conn.stop_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"]
).should.throw(ClientError)
# read-replicas are not allowed to be started at this time.
conn.start_db_instance.when.called_with(
DBInstanceIdentifier=mydb["DBInstanceIdentifier"]
).should.throw(ClientError)
@mock_rds2
def test_get_databases():
conn = boto3.client("rds", region_name="us-west-2")
instances = conn.describe_db_instances()
list(instances["DBInstances"]).should.have.length_of(0)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-2",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
instances = conn.describe_db_instances()
list(instances["DBInstances"]).should.have.length_of(2)
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
list(instances["DBInstances"]).should.have.length_of(1)
instances["DBInstances"][0]["DBInstanceIdentifier"].should.equal("db-master-1")
instances["DBInstances"][0]["DBInstanceArn"].should.equal(
"arn:aws:rds:us-west-2:1234567890:db:db-master-1"
)
@mock_rds2
def test_get_databases_paginated():
conn = boto3.client("rds", region_name="us-west-2")
for i in range(51):
conn.create_db_instance(
AllocatedStorage=5,
Port=5432,
DBInstanceIdentifier="rds%d" % i,
DBInstanceClass="db.t1.micro",
Engine="postgres",
)
resp = conn.describe_db_instances()
resp["DBInstances"].should.have.length_of(50)
resp["Marker"].should.equal(resp["DBInstances"][-1]["DBInstanceIdentifier"])
resp2 = conn.describe_db_instances(Marker=resp["Marker"])
resp2["DBInstances"].should.have.length_of(1)
resp3 = conn.describe_db_instances(MaxRecords=100)
resp3["DBInstances"].should.have.length_of(51)
@mock_rds2
def test_describe_non_existent_database():
conn = boto3.client("rds", region_name="us-west-2")
conn.describe_db_instances.when.called_with(
DBInstanceIdentifier="not-a-db"
).should.throw(ClientError)
@mock_rds2
def test_modify_db_instance():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
instances["DBInstances"][0]["AllocatedStorage"].should.equal(10)
conn.modify_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=20,
ApplyImmediately=True,
VpcSecurityGroupIds=["sg-123456"],
)
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
instances["DBInstances"][0]["AllocatedStorage"].should.equal(20)
instances["DBInstances"][0]["VpcSecurityGroups"][0][
"VpcSecurityGroupId"
].should.equal("sg-123456")
@mock_rds2
def test_rename_db_instance():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
list(instances["DBInstances"]).should.have.length_of(1)
conn.describe_db_instances.when.called_with(
DBInstanceIdentifier="db-master-2"
).should.throw(ClientError)
conn.modify_db_instance(
DBInstanceIdentifier="db-master-1",
NewDBInstanceIdentifier="db-master-2",
ApplyImmediately=True,
)
conn.describe_db_instances.when.called_with(
DBInstanceIdentifier="db-master-1"
).should.throw(ClientError)
instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2")
list(instances["DBInstances"]).should.have.length_of(1)
@mock_rds2
def test_modify_non_existent_database():
conn = boto3.client("rds", region_name="us-west-2")
conn.modify_db_instance.when.called_with(
DBInstanceIdentifier="not-a-db", AllocatedStorage=20, ApplyImmediately=True
).should.throw(ClientError)
@mock_rds2
def test_reboot_db_instance():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
database = conn.reboot_db_instance(DBInstanceIdentifier="db-master-1")
database["DBInstance"]["DBInstanceIdentifier"].should.equal("db-master-1")
@mock_rds2
def test_reboot_non_existent_database():
conn = boto3.client("rds", region_name="us-west-2")
conn.reboot_db_instance.when.called_with(
DBInstanceIdentifier="not-a-db"
).should.throw(ClientError)
@mock_rds2
def test_delete_database():
conn = boto3.client("rds", region_name="us-west-2")
instances = conn.describe_db_instances()
list(instances["DBInstances"]).should.have.length_of(0)
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
instances = conn.describe_db_instances()
list(instances["DBInstances"]).should.have.length_of(1)
conn.delete_db_instance(
DBInstanceIdentifier="db-primary-1",
FinalDBSnapshotIdentifier="primary-1-snapshot",
)
instances = conn.describe_db_instances()
list(instances["DBInstances"]).should.have.length_of(0)
# Saved the snapshot
snapshots = conn.describe_db_snapshots(DBInstanceIdentifier="db-primary-1").get(
"DBSnapshots"
)
snapshots[0].get("Engine").should.equal("postgres")
@mock_rds2
def test_create_db_snapshots():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_snapshot.when.called_with(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1"
).should.throw(ClientError)
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
snapshot = conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1"
).get("DBSnapshot")
snapshot.get("Engine").should.equal("postgres")
snapshot.get("DBInstanceIdentifier").should.equal("db-primary-1")
snapshot.get("DBSnapshotIdentifier").should.equal("g-1")
result = conn.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"])
result["TagList"].should.equal([])
@mock_rds2
def test_create_db_snapshots_copy_tags():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_snapshot.when.called_with(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1"
).should.throw(ClientError)
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
CopyTagsToSnapshot=True,
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
)
snapshot = conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="g-1"
).get("DBSnapshot")
snapshot.get("Engine").should.equal("postgres")
snapshot.get("DBInstanceIdentifier").should.equal("db-primary-1")
snapshot.get("DBSnapshotIdentifier").should.equal("g-1")
result = conn.list_tags_for_resource(ResourceName=snapshot["DBSnapshotArn"])
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_rds2
def test_describe_db_snapshots():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
created = conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1"
).get("DBSnapshot")
created.get("Engine").should.equal("postgres")
by_database_id = conn.describe_db_snapshots(
DBInstanceIdentifier="db-primary-1"
).get("DBSnapshots")
by_snapshot_id = conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1").get(
"DBSnapshots"
)
by_snapshot_id.should.equal(by_database_id)
snapshot = by_snapshot_id[0]
snapshot.should.equal(created)
snapshot.get("Engine").should.equal("postgres")
conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-2"
)
snapshots = conn.describe_db_snapshots(DBInstanceIdentifier="db-primary-1").get(
"DBSnapshots"
)
snapshots.should.have.length_of(2)
@mock_rds2
def test_delete_db_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1", DBSnapshotIdentifier="snapshot-1"
)
conn.describe_db_snapshots(DBSnapshotIdentifier="snapshot-1").get("DBSnapshots")[0]
conn.delete_db_snapshot(DBSnapshotIdentifier="snapshot-1")
conn.describe_db_snapshots.when.called_with(
DBSnapshotIdentifier="snapshot-1"
).should.throw(ClientError)
@mock_rds2
def test_create_option_group():
conn = boto3.client("rds", region_name="us-west-2")
option_group = conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
option_group["OptionGroup"]["OptionGroupName"].should.equal("test")
option_group["OptionGroup"]["EngineName"].should.equal("mysql")
option_group["OptionGroup"]["OptionGroupDescription"].should.equal(
"test option group"
)
option_group["OptionGroup"]["MajorEngineVersion"].should.equal("5.6")
@mock_rds2
def test_create_option_group_bad_engine_name():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group.when.called_with(
OptionGroupName="test",
EngineName="invalid_engine",
MajorEngineVersion="5.6",
OptionGroupDescription="test invalid engine",
).should.throw(ClientError)
@mock_rds2
def test_create_option_group_bad_engine_major_version():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group.when.called_with(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="6.6.6",
OptionGroupDescription="test invalid engine version",
).should.throw(ClientError)
@mock_rds2
def test_create_option_group_empty_description():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group.when.called_with(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="",
).should.throw(ClientError)
@mock_rds2
def test_create_option_group_duplicate():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
conn.create_option_group.when.called_with(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
).should.throw(ClientError)
@mock_rds2
def test_describe_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
option_groups = conn.describe_option_groups(OptionGroupName="test")
option_groups["OptionGroupsList"][0]["OptionGroupName"].should.equal("test")
@mock_rds2
def test_describe_non_existent_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.describe_option_groups.when.called_with(
OptionGroupName="not-a-option-group"
).should.throw(ClientError)
@mock_rds2
def test_delete_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
option_groups = conn.describe_option_groups(OptionGroupName="test")
option_groups["OptionGroupsList"][0]["OptionGroupName"].should.equal("test")
conn.delete_option_group(OptionGroupName="test")
conn.describe_option_groups.when.called_with(OptionGroupName="test").should.throw(
ClientError
)
@mock_rds2
def test_delete_non_existent_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.delete_option_group.when.called_with(
OptionGroupName="non-existent"
).should.throw(ClientError)
@mock_rds2
def test_describe_option_group_options():
conn = boto3.client("rds", region_name="us-west-2")
option_group_options = conn.describe_option_group_options(EngineName="sqlserver-ee")
len(option_group_options["OptionGroupOptions"]).should.equal(4)
option_group_options = conn.describe_option_group_options(
EngineName="sqlserver-ee", MajorEngineVersion="11.00"
)
len(option_group_options["OptionGroupOptions"]).should.equal(2)
option_group_options = conn.describe_option_group_options(
EngineName="mysql", MajorEngineVersion="5.6"
)
len(option_group_options["OptionGroupOptions"]).should.equal(1)
conn.describe_option_group_options.when.called_with(
EngineName="non-existent"
).should.throw(ClientError)
conn.describe_option_group_options.when.called_with(
EngineName="mysql", MajorEngineVersion="non-existent"
).should.throw(ClientError)
@mock_rds2
def test_modify_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
# TODO: create option and validate before deleting.
# if Someone can tell me how the hell to use this function
# to add options to an option_group, I can finish coding this.
result = conn.modify_option_group(
OptionGroupName="test",
OptionsToInclude=[],
OptionsToRemove=["MEMCACHED"],
ApplyImmediately=True,
)
result["OptionGroup"]["EngineName"].should.equal("mysql")
result["OptionGroup"]["Options"].should.equal([])
result["OptionGroup"]["OptionGroupName"].should.equal("test")
@mock_rds2
def test_modify_option_group_no_options():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
conn.modify_option_group.when.called_with(OptionGroupName="test").should.throw(
ClientError
)
@mock_rds2
def test_modify_non_existent_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.modify_option_group.when.called_with(
OptionGroupName="non-existent",
OptionsToInclude=[{"OptionName": "test-option"}],
).should.throw(ClientError, "Specified OptionGroupName: non-existent not found.")
@mock_rds2
def test_delete_non_existent_database():
conn = boto3.client("rds", region_name="us-west-2")
with pytest.raises(ClientError) as ex:
conn.delete_db_instance(DBInstanceIdentifier="non-existent")
ex.value.response["Error"]["Code"].should.equal("DBInstanceNotFound")
ex.value.response["Error"]["Message"].should.equal(
"DBInstance non-existent not found."
)
@mock_rds2
def test_list_tags_invalid_arn():
conn = boto3.client("rds", region_name="us-west-2")
conn.list_tags_for_resource.when.called_with(
ResourceName="arn:aws:rds:bad-arn"
).should.throw(ClientError)
@mock_rds2
def test_list_tags_db():
conn = boto3.client("rds", region_name="us-west-2")
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:db:foo"
)
result["TagList"].should.equal([])
test_instance = conn.create_db_instance(
DBInstanceIdentifier="db-with-tags",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
)
result = conn.list_tags_for_resource(
ResourceName=test_instance["DBInstance"]["DBInstanceArn"]
)
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_rds2
def test_add_tags_db():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-without-tags",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags"
)
list(result["TagList"]).should.have.length_of(2)
conn.add_tags_to_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags",
Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-without-tags"
)
list(result["TagList"]).should.have.length_of(3)
@mock_rds2
def test_remove_tags_db():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-with-tags",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags"
)
list(result["TagList"]).should.have.length_of(2)
conn.remove_tags_from_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags", TagKeys=["foo"]
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:db:db-with-tags"
)
len(result["TagList"]).should.equal(1)
@mock_rds2
def test_list_tags_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:foo"
)
result["TagList"].should.equal([])
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
snapshot = conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1",
DBSnapshotIdentifier="snapshot-with-tags",
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
)
result = conn.list_tags_for_resource(
ResourceName=snapshot["DBSnapshot"]["DBSnapshotArn"]
)
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_rds2
def test_add_tags_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
snapshot = conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1",
DBSnapshotIdentifier="snapshot-without-tags",
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags"
)
list(result["TagList"]).should.have.length_of(2)
conn.add_tags_to_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags",
Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags"
)
list(result["TagList"]).should.have.length_of(3)
@mock_rds2
def test_remove_tags_snapshot():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-primary-1",
AllocatedStorage=10,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
snapshot = conn.create_db_snapshot(
DBInstanceIdentifier="db-primary-1",
DBSnapshotIdentifier="snapshot-with-tags",
Tags=[{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags"
)
list(result["TagList"]).should.have.length_of(2)
conn.remove_tags_from_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags",
TagKeys=["foo"],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags"
)
len(result["TagList"]).should.equal(1)
@mock_rds2
def test_add_tags_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test"
)
list(result["TagList"]).should.have.length_of(0)
conn.add_tags_to_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test",
Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test"
)
list(result["TagList"]).should.have.length_of(2)
@mock_rds2
def test_remove_tags_option_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_option_group(
OptionGroupName="test",
EngineName="mysql",
MajorEngineVersion="5.6",
OptionGroupDescription="test option group",
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test"
)
conn.add_tags_to_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test",
Tags=[{"Key": "foo", "Value": "fish"}, {"Key": "foo2", "Value": "bar2"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test"
)
list(result["TagList"]).should.have.length_of(2)
conn.remove_tags_from_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test", TagKeys=["foo"]
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:og:test"
)
list(result["TagList"]).should.have.length_of(1)
@mock_rds2
def test_create_database_security_group():
conn = boto3.client("rds", region_name="us-west-2")
result = conn.create_db_security_group(
DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group"
)
result["DBSecurityGroup"]["DBSecurityGroupName"].should.equal("db_sg")
result["DBSecurityGroup"]["DBSecurityGroupDescription"].should.equal(
"DB Security Group"
)
result["DBSecurityGroup"]["IPRanges"].should.equal([])
@mock_rds2
def test_get_security_groups():
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_security_groups()
result["DBSecurityGroups"].should.have.length_of(0)
conn.create_db_security_group(
DBSecurityGroupName="db_sg1", DBSecurityGroupDescription="DB Security Group"
)
conn.create_db_security_group(
DBSecurityGroupName="db_sg2", DBSecurityGroupDescription="DB Security Group"
)
result = conn.describe_db_security_groups()
result["DBSecurityGroups"].should.have.length_of(2)
result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg1")
result["DBSecurityGroups"].should.have.length_of(1)
result["DBSecurityGroups"][0]["DBSecurityGroupName"].should.equal("db_sg1")
@mock_rds2
def test_get_non_existent_security_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.describe_db_security_groups.when.called_with(
DBSecurityGroupName="not-a-sg"
).should.throw(ClientError)
@mock_rds2
def test_delete_database_security_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_security_group(
DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group"
)
result = conn.describe_db_security_groups()
result["DBSecurityGroups"].should.have.length_of(1)
conn.delete_db_security_group(DBSecurityGroupName="db_sg")
result = conn.describe_db_security_groups()
result["DBSecurityGroups"].should.have.length_of(0)
@mock_rds2
def test_delete_non_existent_security_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.delete_db_security_group.when.called_with(
DBSecurityGroupName="not-a-db"
).should.throw(ClientError)
@mock_rds2
def test_security_group_authorize():
conn = boto3.client("rds", region_name="us-west-2")
security_group = conn.create_db_security_group(
DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group"
)
security_group["DBSecurityGroup"]["IPRanges"].should.equal([])
conn.authorize_db_security_group_ingress(
DBSecurityGroupName="db_sg", CIDRIP="10.3.2.45/32"
)
result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg")
result["DBSecurityGroups"][0]["IPRanges"].should.have.length_of(1)
result["DBSecurityGroups"][0]["IPRanges"].should.equal(
[{"Status": "authorized", "CIDRIP": "10.3.2.45/32"}]
)
conn.authorize_db_security_group_ingress(
DBSecurityGroupName="db_sg", CIDRIP="10.3.2.46/32"
)
result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg")
result["DBSecurityGroups"][0]["IPRanges"].should.have.length_of(2)
result["DBSecurityGroups"][0]["IPRanges"].should.equal(
[
{"Status": "authorized", "CIDRIP": "10.3.2.45/32"},
{"Status": "authorized", "CIDRIP": "10.3.2.46/32"},
]
)
@mock_rds2
def test_add_security_group_to_database():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
DBInstanceClass="postgres",
Engine="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
)
result = conn.describe_db_instances()
result["DBInstances"][0]["DBSecurityGroups"].should.equal([])
conn.create_db_security_group(
DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group"
)
conn.modify_db_instance(
DBInstanceIdentifier="db-master-1", DBSecurityGroups=["db_sg"]
)
result = conn.describe_db_instances()
result["DBInstances"][0]["DBSecurityGroups"][0]["DBSecurityGroupName"].should.equal(
"db_sg"
)
@mock_rds2
def test_list_tags_security_group():
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
security_group = conn.create_db_security_group(
DBSecurityGroupName="db_sg",
DBSecurityGroupDescription="DB Security Group",
Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}],
)["DBSecurityGroup"]["DBSecurityGroupName"]
resource = "arn:aws:rds:us-west-2:1234567890:secgrp:{0}".format(security_group)
result = conn.list_tags_for_resource(ResourceName=resource)
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_rds2
def test_add_tags_security_group():
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
security_group = conn.create_db_security_group(
DBSecurityGroupName="db_sg", DBSecurityGroupDescription="DB Security Group"
)["DBSecurityGroup"]["DBSecurityGroupName"]
resource = "arn:aws:rds:us-west-2:1234567890:secgrp:{0}".format(security_group)
conn.add_tags_to_resource(
ResourceName=resource,
Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}],
)
result = conn.list_tags_for_resource(ResourceName=resource)
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_rds2
def test_remove_tags_security_group():
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
security_group = conn.create_db_security_group(
DBSecurityGroupName="db_sg",
DBSecurityGroupDescription="DB Security Group",
Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}],
)["DBSecurityGroup"]["DBSecurityGroupName"]
resource = "arn:aws:rds:us-west-2:1234567890:secgrp:{0}".format(security_group)
conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"])
result = conn.list_tags_for_resource(ResourceName=resource)
result["TagList"].should.equal([{"Value": "bar1", "Key": "foo1"}])
@mock_ec2
@mock_rds2
def test_create_database_subnet_group():
vpc_conn = boto3.client("ec2", "us-west-2")
vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
subnet1 = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[
"Subnet"
]
subnet2 = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.2.0/24")[
"Subnet"
]
subnet_ids = [subnet1["SubnetId"], subnet2["SubnetId"]]
conn = boto3.client("rds", region_name="us-west-2")
result = conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet",
DBSubnetGroupDescription="my db subnet",
SubnetIds=subnet_ids,
)
result["DBSubnetGroup"]["DBSubnetGroupName"].should.equal("db_subnet")
result["DBSubnetGroup"]["DBSubnetGroupDescription"].should.equal("my db subnet")
subnets = result["DBSubnetGroup"]["Subnets"]
subnet_group_ids = [subnets[0]["SubnetIdentifier"], subnets[1]["SubnetIdentifier"]]
list(subnet_group_ids).should.equal(subnet_ids)
@mock_ec2
@mock_rds2
def test_create_database_in_subnet_group():
vpc_conn = boto3.client("ec2", "us-west-2")
vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[
"Subnet"
]
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet1",
DBSubnetGroupDescription="my db subnet",
SubnetIds=[subnet["SubnetId"]],
)
conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSubnetGroupName="db_subnet1",
)
result = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
result["DBInstances"][0]["DBSubnetGroup"]["DBSubnetGroupName"].should.equal(
"db_subnet1"
)
@mock_ec2
@mock_rds2
def test_describe_database_subnet_group():
vpc_conn = boto3.client("ec2", "us-west-2")
vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[
"Subnet"
]
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet1",
DBSubnetGroupDescription="my db subnet",
SubnetIds=[subnet["SubnetId"]],
)
conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet2",
DBSubnetGroupDescription="my db subnet",
SubnetIds=[subnet["SubnetId"]],
)
resp = conn.describe_db_subnet_groups()
resp["DBSubnetGroups"].should.have.length_of(2)
subnets = resp["DBSubnetGroups"][0]["Subnets"]
subnets.should.have.length_of(1)
list(
conn.describe_db_subnet_groups(DBSubnetGroupName="db_subnet1")["DBSubnetGroups"]
).should.have.length_of(1)
conn.describe_db_subnet_groups.when.called_with(
DBSubnetGroupName="not-a-subnet"
).should.throw(ClientError)
@mock_ec2
@mock_rds2
def test_delete_database_subnet_group():
vpc_conn = boto3.client("ec2", "us-west-2")
vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[
"Subnet"
]
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet1",
DBSubnetGroupDescription="my db subnet",
SubnetIds=[subnet["SubnetId"]],
)
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(1)
conn.delete_db_subnet_group(DBSubnetGroupName="db_subnet1")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
conn.delete_db_subnet_group.when.called_with(
DBSubnetGroupName="db_subnet1"
).should.throw(ClientError)
@mock_ec2
@mock_rds2
def test_list_tags_database_subnet_group():
vpc_conn = boto3.client("ec2", "us-west-2")
vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[
"Subnet"
]
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
subnet = conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet1",
DBSubnetGroupDescription="my db subnet",
SubnetIds=[subnet["SubnetId"]],
Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}],
)["DBSubnetGroup"]["DBSubnetGroupName"]
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:subgrp:{0}".format(subnet)
)
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_ec2
@mock_rds2
def test_add_tags_database_subnet_group():
vpc_conn = boto3.client("ec2", "us-west-2")
vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[
"Subnet"
]
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
subnet = conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet1",
DBSubnetGroupDescription="my db subnet",
SubnetIds=[subnet["SubnetId"]],
Tags=[],
)["DBSubnetGroup"]["DBSubnetGroupName"]
resource = "arn:aws:rds:us-west-2:1234567890:subgrp:{0}".format(subnet)
conn.add_tags_to_resource(
ResourceName=resource,
Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}],
)
result = conn.list_tags_for_resource(ResourceName=resource)
result["TagList"].should.equal(
[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}]
)
@mock_ec2
@mock_rds2
def test_remove_tags_database_subnet_group():
vpc_conn = boto3.client("ec2", "us-west-2")
vpc = vpc_conn.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]
subnet = vpc_conn.create_subnet(VpcId=vpc["VpcId"], CidrBlock="10.0.1.0/24")[
"Subnet"
]
conn = boto3.client("rds", region_name="us-west-2")
result = conn.describe_db_subnet_groups()
result["DBSubnetGroups"].should.have.length_of(0)
subnet = conn.create_db_subnet_group(
DBSubnetGroupName="db_subnet1",
DBSubnetGroupDescription="my db subnet",
SubnetIds=[subnet["SubnetId"]],
Tags=[{"Value": "bar", "Key": "foo"}, {"Value": "bar1", "Key": "foo1"}],
)["DBSubnetGroup"]["DBSubnetGroupName"]
resource = "arn:aws:rds:us-west-2:1234567890:subgrp:{0}".format(subnet)
conn.remove_tags_from_resource(ResourceName=resource, TagKeys=["foo"])
result = conn.list_tags_for_resource(ResourceName=resource)
result["TagList"].should.equal([{"Value": "bar1", "Key": "foo1"}])
@mock_rds2
def test_create_database_replica():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
)
replica = conn.create_db_instance_read_replica(
DBInstanceIdentifier="db-replica-1",
SourceDBInstanceIdentifier="db-master-1",
DBInstanceClass="db.m1.small",
)
replica["DBInstance"]["ReadReplicaSourceDBInstanceIdentifier"].should.equal(
"db-master-1"
)
replica["DBInstance"]["DBInstanceClass"].should.equal("db.m1.small")
replica["DBInstance"]["DBInstanceIdentifier"].should.equal("db-replica-1")
master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"].should.equal(
["db-replica-1"]
)
conn.delete_db_instance(DBInstanceIdentifier="db-replica-1", SkipFinalSnapshot=True)
master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")
master["DBInstances"][0]["ReadReplicaDBInstanceIdentifiers"].should.equal([])
@mock_rds2
@mock_kms
def test_create_database_with_encrypted_storage():
kms_conn = boto3.client("kms", region_name="us-west-2")
key = kms_conn.create_key(
Policy="my RDS encryption policy",
Description="RDS encryption key",
KeyUsage="ENCRYPT_DECRYPT",
)
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
DBSecurityGroups=["my_sg"],
StorageEncrypted=True,
KmsKeyId=key["KeyMetadata"]["KeyId"],
)
database["DBInstance"]["StorageEncrypted"].should.equal(True)
database["DBInstance"]["KmsKeyId"].should.equal(key["KeyMetadata"]["KeyId"])
@mock_rds2
def test_create_db_parameter_group():
region = "us-west-2"
pg_name = "test"
conn = boto3.client("rds", region_name=region)
db_parameter_group = conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
)
db_parameter_group["DBParameterGroup"]["DBParameterGroupName"].should.equal("test")
db_parameter_group["DBParameterGroup"]["DBParameterGroupFamily"].should.equal(
"mysql5.6"
)
db_parameter_group["DBParameterGroup"]["Description"].should.equal(
"test parameter group"
)
db_parameter_group["DBParameterGroup"]["DBParameterGroupArn"].should.equal(
"arn:aws:rds:{0}:{1}:pg:{2}".format(region, ACCOUNT_ID, pg_name)
)
@mock_rds2
def test_create_db_instance_with_parameter_group():
conn = boto3.client("rds", region_name="us-west-2")
db_parameter_group = conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
)
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="mysql",
DBInstanceClass="db.m1.small",
DBParameterGroupName="test",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
)
len(database["DBInstance"]["DBParameterGroups"]).should.equal(1)
database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"].should.equal(
"test"
)
database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"].should.equal(
"in-sync"
)
@mock_rds2
def test_create_database_with_default_port():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="postgres",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
DBSecurityGroups=["my_sg"],
)
database["DBInstance"]["Endpoint"]["Port"].should.equal(5432)
@mock_rds2
def test_modify_db_instance_with_parameter_group():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="db-master-1",
AllocatedStorage=10,
Engine="mysql",
DBInstanceClass="db.m1.small",
MasterUsername="root",
MasterUserPassword="hunter2",
Port=1234,
)
len(database["DBInstance"]["DBParameterGroups"]).should.equal(1)
database["DBInstance"]["DBParameterGroups"][0]["DBParameterGroupName"].should.equal(
"default.mysql5.6"
)
database["DBInstance"]["DBParameterGroups"][0]["ParameterApplyStatus"].should.equal(
"in-sync"
)
db_parameter_group = conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
)
conn.modify_db_instance(
DBInstanceIdentifier="db-master-1",
DBParameterGroupName="test",
ApplyImmediately=True,
)
database = conn.describe_db_instances(DBInstanceIdentifier="db-master-1")[
"DBInstances"
][0]
len(database["DBParameterGroups"]).should.equal(1)
database["DBParameterGroups"][0]["DBParameterGroupName"].should.equal("test")
database["DBParameterGroups"][0]["ParameterApplyStatus"].should.equal("in-sync")
@mock_rds2
def test_create_db_parameter_group_empty_description():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_parameter_group.when.called_with(
DBParameterGroupName="test", DBParameterGroupFamily="mysql5.6", Description=""
).should.throw(ClientError)
@mock_rds2
def test_create_db_parameter_group_duplicate():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
)
conn.create_db_parameter_group.when.called_with(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
).should.throw(ClientError)
@mock_rds2
def test_describe_db_parameter_group():
region = "us-west-2"
pg_name = "test"
conn = boto3.client("rds", region_name=region)
conn.create_db_parameter_group(
DBParameterGroupName=pg_name,
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
)
db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test")
db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"].should.equal(
"test"
)
db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupArn"].should.equal(
"arn:aws:rds:{0}:{1}:pg:{2}".format(region, ACCOUNT_ID, pg_name)
)
@mock_rds2
def test_describe_non_existent_db_parameter_group():
conn = boto3.client("rds", region_name="us-west-2")
db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test")
len(db_parameter_groups["DBParameterGroups"]).should.equal(0)
@mock_rds2
def test_delete_db_parameter_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
)
db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test")
db_parameter_groups["DBParameterGroups"][0]["DBParameterGroupName"].should.equal(
"test"
)
conn.delete_db_parameter_group(DBParameterGroupName="test")
db_parameter_groups = conn.describe_db_parameter_groups(DBParameterGroupName="test")
len(db_parameter_groups["DBParameterGroups"]).should.equal(0)
@mock_rds2
def test_modify_db_parameter_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
)
modify_result = conn.modify_db_parameter_group(
DBParameterGroupName="test",
Parameters=[
{
"ParameterName": "foo",
"ParameterValue": "foo_val",
"Description": "test param",
"ApplyMethod": "immediate",
}
],
)
modify_result["DBParameterGroupName"].should.equal("test")
db_parameters = conn.describe_db_parameters(DBParameterGroupName="test")
db_parameters["Parameters"][0]["ParameterName"].should.equal("foo")
db_parameters["Parameters"][0]["ParameterValue"].should.equal("foo_val")
db_parameters["Parameters"][0]["Description"].should.equal("test param")
db_parameters["Parameters"][0]["ApplyMethod"].should.equal("immediate")
@mock_rds2
def test_delete_non_existent_db_parameter_group():
conn = boto3.client("rds", region_name="us-west-2")
conn.delete_db_parameter_group.when.called_with(
DBParameterGroupName="non-existent"
).should.throw(ClientError)
@mock_rds2
def test_create_parameter_group_with_tags():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_parameter_group(
DBParameterGroupName="test",
DBParameterGroupFamily="mysql5.6",
Description="test parameter group",
Tags=[{"Key": "foo", "Value": "bar"}],
)
result = conn.list_tags_for_resource(
ResourceName="arn:aws:rds:us-west-2:1234567890:pg:test"
)
result["TagList"].should.equal([{"Value": "bar", "Key": "foo"}])
@mock_rds2
def test_create_db_with_iam_authentication():
conn = boto3.client("rds", region_name="us-west-2")
database = conn.create_db_instance(
DBInstanceIdentifier="rds",
DBInstanceClass="db.t1.micro",
Engine="postgres",
EnableIAMDatabaseAuthentication=True,
)
db_instance = database["DBInstance"]
db_instance["IAMDatabaseAuthenticationEnabled"].should.equal(True)
@mock_rds2
def test_create_db_snapshot_with_iam_authentication():
conn = boto3.client("rds", region_name="us-west-2")
conn.create_db_instance(
DBInstanceIdentifier="rds",
DBInstanceClass="db.t1.micro",
Engine="postgres",
EnableIAMDatabaseAuthentication=True,
)
snapshot = conn.create_db_snapshot(
DBInstanceIdentifier="rds", DBSnapshotIdentifier="snapshot"
).get("DBSnapshot")
snapshot.get("IAMDatabaseAuthenticationEnabled").should.equal(True)
@mock_rds2
def test_create_db_instance_with_tags():
client = boto3.client("rds", region_name="us-west-2")
tags = [{"Key": "foo", "Value": "bar"}, {"Key": "foo1", "Value": "bar1"}]
db_instance_identifier = "test-db-instance"
resp = client.create_db_instance(
DBInstanceIdentifier=db_instance_identifier,
Engine="postgres",
DBName="staging-postgres",
DBInstanceClass="db.m1.small",
Tags=tags,
)
resp["DBInstance"]["TagList"].should.equal(tags)
resp = client.describe_db_instances(DBInstanceIdentifier=db_instance_identifier)
resp["DBInstances"][0]["TagList"].should.equal(tags)