from __future__ import unicode_literals from botocore.exceptions import ClientError, ParamValidationError import boto3 import sure # noqa from moto import mock_ec2, mock_kms, mock_rds2 @mock_rds2 def test_create_database(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', LicenseModel='license-included', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) db_instance = database['DBInstance'] db_instance['AllocatedStorage'].should.equal(10) db_instance['DBInstanceClass'].should.equal("db.m1.small") db_instance['LicenseModel'].should.equal("license-included") db_instance['MasterUsername'].should.equal("root") db_instance['DBSecurityGroups'][0][ 'DBSecurityGroupName'].should.equal('my_sg') db_instance['DBInstanceArn'].should.equal( 'arn:aws:rds:us-west-2:1234567890:db:db-master-1') db_instance['DBInstanceStatus'].should.equal('available') db_instance['DBName'].should.equal('staging-postgres') db_instance['DBInstanceIdentifier'].should.equal("db-master-1") db_instance['IAMDatabaseAuthenticationEnabled'].should.equal(False) db_instance['DbiResourceId'].should.contain("db-") db_instance['CopyTagsToSnapshot'].should.equal(False) @mock_rds2 def test_stop_database(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', LicenseModel='license-included', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) mydb = conn.describe_db_instances(DBInstanceIdentifier=database['DBInstance']['DBInstanceIdentifier'])['DBInstances'][0] mydb['DBInstanceStatus'].should.equal('available') # test stopping database should shutdown response = conn.stop_db_instance(DBInstanceIdentifier=mydb['DBInstanceIdentifier']) response['ResponseMetadata']['HTTPStatusCode'].should.equal(200) response['DBInstance']['DBInstanceStatus'].should.equal('stopped') # test rdsclient error when trying to stop an already stopped database conn.stop_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier']).should.throw(ClientError) # test stopping a stopped database with snapshot should error and no snapshot should exist for that call conn.stop_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier'], DBSnapshotIdentifier='rocky4570-rds-snap').should.throw(ClientError) response = conn.describe_db_snapshots() response['DBSnapshots'].should.equal([]) @mock_rds2 def test_start_database(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', LicenseModel='license-included', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) mydb = conn.describe_db_instances(DBInstanceIdentifier=database['DBInstance']['DBInstanceIdentifier'])['DBInstances'][0] mydb['DBInstanceStatus'].should.equal('available') # test starting an already started database should error conn.start_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier']).should.throw(ClientError) # stop and test start - should go from stopped to available, create snapshot and check snapshot response = conn.stop_db_instance(DBInstanceIdentifier=mydb['DBInstanceIdentifier'], DBSnapshotIdentifier='rocky4570-rds-snap') response['ResponseMetadata']['HTTPStatusCode'].should.equal(200) response['DBInstance']['DBInstanceStatus'].should.equal('stopped') response = conn.describe_db_snapshots() response['DBSnapshots'][0]['DBSnapshotIdentifier'].should.equal('rocky4570-rds-snap') response = conn.start_db_instance(DBInstanceIdentifier=mydb['DBInstanceIdentifier']) response['ResponseMetadata']['HTTPStatusCode'].should.equal(200) response['DBInstance']['DBInstanceStatus'].should.equal('available') # starting database should not remove snapshot response = conn.describe_db_snapshots() response['DBSnapshots'][0]['DBSnapshotIdentifier'].should.equal('rocky4570-rds-snap') # test stopping database, create snapshot with existing snapshot already created should throw error conn.stop_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier'], DBSnapshotIdentifier='rocky4570-rds-snap').should.throw(ClientError) # test stopping database not invoking snapshot should succeed. response = conn.stop_db_instance(DBInstanceIdentifier=mydb['DBInstanceIdentifier']) response['ResponseMetadata']['HTTPStatusCode'].should.equal(200) response['DBInstance']['DBInstanceStatus'].should.equal('stopped') @mock_rds2 def test_fail_to_stop_multi_az(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', LicenseModel='license-included', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"], MultiAZ=True) mydb = conn.describe_db_instances(DBInstanceIdentifier=database['DBInstance']['DBInstanceIdentifier'])['DBInstances'][0] mydb['DBInstanceStatus'].should.equal('available') # multi-az databases arent allowed to be shutdown at this time. conn.stop_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier']).should.throw(ClientError) # multi-az databases arent allowed to be started up at this time. conn.start_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier']).should.throw(ClientError) @mock_rds2 def test_fail_to_stop_readreplica(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', LicenseModel='license-included', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) replica = conn.create_db_instance_read_replica(DBInstanceIdentifier="db-replica-1", SourceDBInstanceIdentifier="db-master-1", DBInstanceClass="db.m1.small") mydb = conn.describe_db_instances(DBInstanceIdentifier=replica['DBInstance']['DBInstanceIdentifier'])['DBInstances'][0] mydb['DBInstanceStatus'].should.equal('available') # read-replicas are not allowed to be stopped at this time. conn.stop_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier']).should.throw(ClientError) # read-replicas are not allowed to be started at this time. conn.start_db_instance.when.called_with(DBInstanceIdentifier=mydb['DBInstanceIdentifier']).should.throw(ClientError) @mock_rds2 def test_get_databases(): conn = boto3.client('rds', region_name='us-west-2') instances = conn.describe_db_instances() list(instances['DBInstances']).should.have.length_of(0) conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg']) conn.create_db_instance(DBInstanceIdentifier='db-master-2', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg']) instances = conn.describe_db_instances() list(instances['DBInstances']).should.have.length_of(2) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") list(instances['DBInstances']).should.have.length_of(1) instances['DBInstances'][0][ 'DBInstanceIdentifier'].should.equal("db-master-1") instances['DBInstances'][0]['DBInstanceArn'].should.equal( 'arn:aws:rds:us-west-2:1234567890:db:db-master-1') @mock_rds2 def test_get_databases_paginated(): conn = boto3.client('rds', region_name="us-west-2") for i in range(51): conn.create_db_instance(AllocatedStorage=5, Port=5432, DBInstanceIdentifier='rds%d' % i, DBInstanceClass='db.t1.micro', Engine='postgres') resp = conn.describe_db_instances() resp["DBInstances"].should.have.length_of(50) resp["Marker"].should.equal(resp["DBInstances"][-1]['DBInstanceIdentifier']) resp2 = conn.describe_db_instances(Marker=resp["Marker"]) resp2["DBInstances"].should.have.length_of(1) resp3 = conn.describe_db_instances(MaxRecords=100) resp3["DBInstances"].should.have.length_of(51) @mock_rds2 def test_describe_non_existant_database(): conn = boto3.client('rds', region_name='us-west-2') conn.describe_db_instances.when.called_with( DBInstanceIdentifier="not-a-db").should.throw(ClientError) @mock_rds2 def test_modify_db_instance(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg']) instances = conn.describe_db_instances(DBInstanceIdentifier='db-master-1') instances['DBInstances'][0]['AllocatedStorage'].should.equal(10) conn.modify_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=20, ApplyImmediately=True) instances = conn.describe_db_instances(DBInstanceIdentifier='db-master-1') instances['DBInstances'][0]['AllocatedStorage'].should.equal(20) @mock_rds2 def test_rename_db_instance(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg']) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") list(instances['DBInstances']).should.have.length_of(1) conn.describe_db_instances.when.called_with(DBInstanceIdentifier="db-master-2").should.throw(ClientError) conn.modify_db_instance(DBInstanceIdentifier='db-master-1', NewDBInstanceIdentifier='db-master-2', ApplyImmediately=True) conn.describe_db_instances.when.called_with(DBInstanceIdentifier="db-master-1").should.throw(ClientError) instances = conn.describe_db_instances(DBInstanceIdentifier="db-master-2") list(instances['DBInstances']).should.have.length_of(1) @mock_rds2 def test_modify_non_existant_database(): conn = boto3.client('rds', region_name='us-west-2') conn.modify_db_instance.when.called_with(DBInstanceIdentifier='not-a-db', AllocatedStorage=20, ApplyImmediately=True).should.throw(ClientError) @mock_rds2 def test_reboot_db_instance(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg']) database = conn.reboot_db_instance(DBInstanceIdentifier='db-master-1') database['DBInstance']['DBInstanceIdentifier'].should.equal("db-master-1") @mock_rds2 def test_reboot_non_existant_database(): conn = boto3.client('rds', region_name='us-west-2') conn.reboot_db_instance.when.called_with( DBInstanceIdentifier="not-a-db").should.throw(ClientError) @mock_rds2 def test_delete_database(): conn = boto3.client('rds', region_name='us-west-2') instances = conn.describe_db_instances() list(instances['DBInstances']).should.have.length_of(0) conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg']) instances = conn.describe_db_instances() list(instances['DBInstances']).should.have.length_of(1) conn.delete_db_instance(DBInstanceIdentifier="db-primary-1", FinalDBSnapshotIdentifier='primary-1-snapshot') instances = conn.describe_db_instances() list(instances['DBInstances']).should.have.length_of(0) # Saved the snapshot snapshots = conn.describe_db_snapshots(DBInstanceIdentifier="db-primary-1").get('DBSnapshots') snapshots[0].get('Engine').should.equal('postgres') @mock_rds2 def test_delete_non_existant_database(): conn = boto3.client('rds2', region_name="us-west-2") conn.delete_db_instance.when.called_with( DBInstanceIdentifier="not-a-db").should.throw(ClientError) @mock_rds2 def test_create_db_snapshots(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_snapshot.when.called_with( DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-1').should.throw(ClientError) conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='g-1').get('DBSnapshot') snapshot.get('Engine').should.equal('postgres') snapshot.get('DBInstanceIdentifier').should.equal('db-primary-1') snapshot.get('DBSnapshotIdentifier').should.equal('g-1') result = conn.list_tags_for_resource(ResourceName=snapshot['DBSnapshotArn']) result['TagList'].should.equal([]) @mock_rds2 def test_create_db_snapshots_copy_tags(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_snapshot.when.called_with( DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-1').should.throw(ClientError) conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"], CopyTagsToSnapshot=True, Tags=[ { 'Key': 'foo', 'Value': 'bar', }, { 'Key': 'foo1', 'Value': 'bar1', }, ]) snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='g-1').get('DBSnapshot') snapshot.get('Engine').should.equal('postgres') snapshot.get('DBInstanceIdentifier').should.equal('db-primary-1') snapshot.get('DBSnapshotIdentifier').should.equal('g-1') result = conn.list_tags_for_resource(ResourceName=snapshot['DBSnapshotArn']) result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) @mock_rds2 def test_describe_db_snapshots(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) created = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-1').get('DBSnapshot') created.get('Engine').should.equal('postgres') by_database_id = conn.describe_db_snapshots(DBInstanceIdentifier='db-primary-1').get('DBSnapshots') by_snapshot_id = conn.describe_db_snapshots(DBSnapshotIdentifier='snapshot-1').get('DBSnapshots') by_snapshot_id.should.equal(by_database_id) snapshot = by_snapshot_id[0] snapshot.should.equal(created) snapshot.get('Engine').should.equal('postgres') conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-2') snapshots = conn.describe_db_snapshots(DBInstanceIdentifier='db-primary-1').get('DBSnapshots') snapshots.should.have.length_of(2) @mock_rds2 def test_delete_db_snapshot(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-1') conn.describe_db_snapshots(DBSnapshotIdentifier='snapshot-1').get('DBSnapshots')[0] conn.delete_db_snapshot(DBSnapshotIdentifier='snapshot-1') conn.describe_db_snapshots.when.called_with( DBSnapshotIdentifier='snapshot-1').should.throw(ClientError) @mock_rds2 def test_create_option_group(): conn = boto3.client('rds', region_name='us-west-2') option_group = conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') option_group['OptionGroup']['OptionGroupName'].should.equal('test') option_group['OptionGroup']['EngineName'].should.equal('mysql') option_group['OptionGroup'][ 'OptionGroupDescription'].should.equal('test option group') option_group['OptionGroup']['MajorEngineVersion'].should.equal('5.6') @mock_rds2 def test_create_option_group_bad_engine_name(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group.when.called_with(OptionGroupName='test', EngineName='invalid_engine', MajorEngineVersion='5.6', OptionGroupDescription='test invalid engine').should.throw(ClientError) @mock_rds2 def test_create_option_group_bad_engine_major_version(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group.when.called_with(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='6.6.6', OptionGroupDescription='test invalid engine version').should.throw(ClientError) @mock_rds2 def test_create_option_group_empty_description(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group.when.called_with(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='').should.throw(ClientError) @mock_rds2 def test_create_option_group_duplicate(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') conn.create_option_group.when.called_with(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group').should.throw(ClientError) @mock_rds2 def test_describe_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') option_groups = conn.describe_option_groups(OptionGroupName='test') option_groups['OptionGroupsList'][0][ 'OptionGroupName'].should.equal('test') @mock_rds2 def test_describe_non_existant_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.describe_option_groups.when.called_with( OptionGroupName="not-a-option-group").should.throw(ClientError) @mock_rds2 def test_delete_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') option_groups = conn.describe_option_groups(OptionGroupName='test') option_groups['OptionGroupsList'][0][ 'OptionGroupName'].should.equal('test') conn.delete_option_group(OptionGroupName='test') conn.describe_option_groups.when.called_with( OptionGroupName='test').should.throw(ClientError) @mock_rds2 def test_delete_non_existant_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.delete_option_group.when.called_with( OptionGroupName='non-existant').should.throw(ClientError) @mock_rds2 def test_describe_option_group_options(): conn = boto3.client('rds', region_name='us-west-2') option_group_options = conn.describe_option_group_options( EngineName='sqlserver-ee') len(option_group_options['OptionGroupOptions']).should.equal(4) option_group_options = conn.describe_option_group_options( EngineName='sqlserver-ee', MajorEngineVersion='11.00') len(option_group_options['OptionGroupOptions']).should.equal(2) option_group_options = conn.describe_option_group_options( EngineName='mysql', MajorEngineVersion='5.6') len(option_group_options['OptionGroupOptions']).should.equal(1) conn.describe_option_group_options.when.called_with( EngineName='non-existent').should.throw(ClientError) conn.describe_option_group_options.when.called_with( EngineName='mysql', MajorEngineVersion='non-existent').should.throw(ClientError) @mock_rds2 def test_modify_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') # TODO: create option and validate before deleting. # if Someone can tell me how the hell to use this function # to add options to an option_group, I can finish coding this. result = conn.modify_option_group(OptionGroupName='test', OptionsToInclude=[ ], OptionsToRemove=['MEMCACHED'], ApplyImmediately=True) result['OptionGroup']['EngineName'].should.equal('mysql') result['OptionGroup']['Options'].should.equal([]) result['OptionGroup']['OptionGroupName'].should.equal('test') @mock_rds2 def test_modify_option_group_no_options(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') conn.modify_option_group.when.called_with( OptionGroupName='test').should.throw(ClientError) @mock_rds2 def test_modify_non_existant_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.modify_option_group.when.called_with(OptionGroupName='non-existant', OptionsToInclude=[( 'OptionName', 'Port', 'DBSecurityGroupMemberships', 'VpcSecurityGroupMemberships', 'OptionSettings')]).should.throw(ParamValidationError) @mock_rds2 def test_delete_non_existant_database(): conn = boto3.client('rds', region_name='us-west-2') conn.delete_db_instance.when.called_with( DBInstanceIdentifier="not-a-db").should.throw(ClientError) @mock_rds2 def test_list_tags_invalid_arn(): conn = boto3.client('rds', region_name='us-west-2') conn.list_tags_for_resource.when.called_with( ResourceName='arn:aws:rds:bad-arn').should.throw(ClientError) @mock_rds2 def test_list_tags_db(): conn = boto3.client('rds', region_name='us-west-2') result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:db:foo') result['TagList'].should.equal([]) test_instance = conn.create_db_instance( DBInstanceIdentifier='db-with-tags', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg'], Tags=[ { 'Key': 'foo', 'Value': 'bar', }, { 'Key': 'foo1', 'Value': 'bar1', }, ]) result = conn.list_tags_for_resource( ResourceName=test_instance['DBInstance']['DBInstanceArn']) result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) @mock_rds2 def test_add_tags_db(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-without-tags', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg'], Tags=[ { 'Key': 'foo', 'Value': 'bar', }, { 'Key': 'foo1', 'Value': 'bar1', }, ]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:db:db-without-tags') list(result['TagList']).should.have.length_of(2) conn.add_tags_to_resource(ResourceName='arn:aws:rds:us-west-2:1234567890:db:db-without-tags', Tags=[ { 'Key': 'foo', 'Value': 'fish', }, { 'Key': 'foo2', 'Value': 'bar2', }, ]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:db:db-without-tags') list(result['TagList']).should.have.length_of(3) @mock_rds2 def test_remove_tags_db(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-with-tags', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=['my_sg'], Tags=[ { 'Key': 'foo', 'Value': 'bar', }, { 'Key': 'foo1', 'Value': 'bar1', }, ]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:db:db-with-tags') list(result['TagList']).should.have.length_of(2) conn.remove_tags_from_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:db:db-with-tags', TagKeys=['foo']) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:db:db-with-tags') len(result['TagList']).should.equal(1) @mock_rds2 def test_list_tags_snapshot(): conn = boto3.client('rds', region_name='us-west-2') result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:foo') result['TagList'].should.equal([]) conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-with-tags', Tags=[ { 'Key': 'foo', 'Value': 'bar', }, { 'Key': 'foo1', 'Value': 'bar1', }, ]) result = conn.list_tags_for_resource(ResourceName=snapshot['DBSnapshot']['DBSnapshotArn']) result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) @mock_rds2 def test_add_tags_snapshot(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-without-tags', Tags=[ { 'Key': 'foo', 'Value': 'bar', }, { 'Key': 'foo1', 'Value': 'bar1', }, ]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags') list(result['TagList']).should.have.length_of(2) conn.add_tags_to_resource(ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags', Tags=[ { 'Key': 'foo', 'Value': 'fish', }, { 'Key': 'foo2', 'Value': 'bar2', }, ]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-without-tags') list(result['TagList']).should.have.length_of(3) @mock_rds2 def test_remove_tags_snapshot(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-primary-1', AllocatedStorage=10, Engine='postgres', DBName='staging-postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) snapshot = conn.create_db_snapshot(DBInstanceIdentifier='db-primary-1', DBSnapshotIdentifier='snapshot-with-tags', Tags=[ { 'Key': 'foo', 'Value': 'bar', }, { 'Key': 'foo1', 'Value': 'bar1', }, ]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags') list(result['TagList']).should.have.length_of(2) conn.remove_tags_from_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags', TagKeys=['foo']) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:snapshot:snapshot-with-tags') len(result['TagList']).should.equal(1) @mock_rds2 def test_add_tags_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:og:test') list(result['TagList']).should.have.length_of(0) conn.add_tags_to_resource(ResourceName='arn:aws:rds:us-west-2:1234567890:og:test', Tags=[ { 'Key': 'foo', 'Value': 'fish', }, { 'Key': 'foo2', 'Value': 'bar2', }]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:og:test') list(result['TagList']).should.have.length_of(2) @mock_rds2 def test_remove_tags_option_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_option_group(OptionGroupName='test', EngineName='mysql', MajorEngineVersion='5.6', OptionGroupDescription='test option group') result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:og:test') conn.add_tags_to_resource(ResourceName='arn:aws:rds:us-west-2:1234567890:og:test', Tags=[ { 'Key': 'foo', 'Value': 'fish', }, { 'Key': 'foo2', 'Value': 'bar2', }]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:og:test') list(result['TagList']).should.have.length_of(2) conn.remove_tags_from_resource(ResourceName='arn:aws:rds:us-west-2:1234567890:og:test', TagKeys=['foo']) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:og:test') list(result['TagList']).should.have.length_of(1) @mock_rds2 def test_create_database_security_group(): conn = boto3.client('rds', region_name='us-west-2') result = conn.create_db_security_group( DBSecurityGroupName='db_sg', DBSecurityGroupDescription='DB Security Group') result['DBSecurityGroup']['DBSecurityGroupName'].should.equal("db_sg") result['DBSecurityGroup'][ 'DBSecurityGroupDescription'].should.equal("DB Security Group") result['DBSecurityGroup']['IPRanges'].should.equal([]) @mock_rds2 def test_get_security_groups(): conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_security_groups() result['DBSecurityGroups'].should.have.length_of(0) conn.create_db_security_group( DBSecurityGroupName='db_sg1', DBSecurityGroupDescription='DB Security Group') conn.create_db_security_group( DBSecurityGroupName='db_sg2', DBSecurityGroupDescription='DB Security Group') result = conn.describe_db_security_groups() result['DBSecurityGroups'].should.have.length_of(2) result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg1") result['DBSecurityGroups'].should.have.length_of(1) result['DBSecurityGroups'][0]['DBSecurityGroupName'].should.equal("db_sg1") @mock_rds2 def test_get_non_existant_security_group(): conn = boto3.client('rds', region_name='us-west-2') conn.describe_db_security_groups.when.called_with( DBSecurityGroupName="not-a-sg").should.throw(ClientError) @mock_rds2 def test_delete_database_security_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_security_group( DBSecurityGroupName='db_sg', DBSecurityGroupDescription='DB Security Group') result = conn.describe_db_security_groups() result['DBSecurityGroups'].should.have.length_of(1) conn.delete_db_security_group(DBSecurityGroupName="db_sg") result = conn.describe_db_security_groups() result['DBSecurityGroups'].should.have.length_of(0) @mock_rds2 def test_delete_non_existant_security_group(): conn = boto3.client('rds', region_name='us-west-2') conn.delete_db_security_group.when.called_with( DBSecurityGroupName="not-a-db").should.throw(ClientError) @mock_rds2 def test_security_group_authorize(): conn = boto3.client('rds', region_name='us-west-2') security_group = conn.create_db_security_group(DBSecurityGroupName='db_sg', DBSecurityGroupDescription='DB Security Group') security_group['DBSecurityGroup']['IPRanges'].should.equal([]) conn.authorize_db_security_group_ingress(DBSecurityGroupName='db_sg', CIDRIP='10.3.2.45/32') result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg") result['DBSecurityGroups'][0]['IPRanges'].should.have.length_of(1) result['DBSecurityGroups'][0]['IPRanges'].should.equal( [{'Status': 'authorized', 'CIDRIP': '10.3.2.45/32'}]) conn.authorize_db_security_group_ingress(DBSecurityGroupName='db_sg', CIDRIP='10.3.2.46/32') result = conn.describe_db_security_groups(DBSecurityGroupName="db_sg") result['DBSecurityGroups'][0]['IPRanges'].should.have.length_of(2) result['DBSecurityGroups'][0]['IPRanges'].should.equal([ {'Status': 'authorized', 'CIDRIP': '10.3.2.45/32'}, {'Status': 'authorized', 'CIDRIP': '10.3.2.46/32'}, ]) @mock_rds2 def test_add_security_group_to_database(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, DBInstanceClass='postgres', Engine='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234) result = conn.describe_db_instances() result['DBInstances'][0]['DBSecurityGroups'].should.equal([]) conn.create_db_security_group(DBSecurityGroupName='db_sg', DBSecurityGroupDescription='DB Security Group') conn.modify_db_instance(DBInstanceIdentifier='db-master-1', DBSecurityGroups=['db_sg']) result = conn.describe_db_instances() result['DBInstances'][0]['DBSecurityGroups'][0][ 'DBSecurityGroupName'].should.equal('db_sg') @mock_rds2 def test_list_tags_security_group(): conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) security_group = conn.create_db_security_group(DBSecurityGroupName="db_sg", DBSecurityGroupDescription='DB Security Group', Tags=[{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}])['DBSecurityGroup']['DBSecurityGroupName'] resource = 'arn:aws:rds:us-west-2:1234567890:secgrp:{0}'.format( security_group) result = conn.list_tags_for_resource(ResourceName=resource) result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) @mock_rds2 def test_add_tags_security_group(): conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) security_group = conn.create_db_security_group(DBSecurityGroupName="db_sg", DBSecurityGroupDescription='DB Security Group')['DBSecurityGroup']['DBSecurityGroupName'] resource = 'arn:aws:rds:us-west-2:1234567890:secgrp:{0}'.format( security_group) conn.add_tags_to_resource(ResourceName=resource, Tags=[{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) result = conn.list_tags_for_resource(ResourceName=resource) result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) @mock_rds2 def test_remove_tags_security_group(): conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) security_group = conn.create_db_security_group(DBSecurityGroupName="db_sg", DBSecurityGroupDescription='DB Security Group', Tags=[{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}])['DBSecurityGroup']['DBSecurityGroupName'] resource = 'arn:aws:rds:us-west-2:1234567890:secgrp:{0}'.format( security_group) conn.remove_tags_from_resource(ResourceName=resource, TagKeys=['foo']) result = conn.list_tags_for_resource(ResourceName=resource) result['TagList'].should.equal([{'Value': 'bar1', 'Key': 'foo1'}]) @mock_ec2 @mock_rds2 def test_create_database_subnet_group(): vpc_conn = boto3.client('ec2', 'us-west-2') vpc = vpc_conn.create_vpc(CidrBlock='10.0.0.0/16')['Vpc'] subnet1 = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.1.0/24')['Subnet'] subnet2 = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.2.0/24')['Subnet'] subnet_ids = [subnet1['SubnetId'], subnet2['SubnetId']] conn = boto3.client('rds', region_name='us-west-2') result = conn.create_db_subnet_group(DBSubnetGroupName='db_subnet', DBSubnetGroupDescription='my db subnet', SubnetIds=subnet_ids) result['DBSubnetGroup']['DBSubnetGroupName'].should.equal("db_subnet") result['DBSubnetGroup'][ 'DBSubnetGroupDescription'].should.equal("my db subnet") subnets = result['DBSubnetGroup']['Subnets'] subnet_group_ids = [subnets[0]['SubnetIdentifier'], subnets[1]['SubnetIdentifier']] list(subnet_group_ids).should.equal(subnet_ids) @mock_ec2 @mock_rds2 def test_create_database_in_subnet_group(): vpc_conn = boto3.client('ec2', 'us-west-2') vpc = vpc_conn.create_vpc(CidrBlock='10.0.0.0/16')['Vpc'] subnet = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.1.0/24')['Subnet'] conn = boto3.client('rds', region_name='us-west-2') conn.create_db_subnet_group(DBSubnetGroupName='db_subnet1', DBSubnetGroupDescription='my db subnet', SubnetIds=[subnet['SubnetId']]) conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSubnetGroupName='db_subnet1') result = conn.describe_db_instances(DBInstanceIdentifier='db-master-1') result['DBInstances'][0]['DBSubnetGroup'][ 'DBSubnetGroupName'].should.equal('db_subnet1') @mock_ec2 @mock_rds2 def test_describe_database_subnet_group(): vpc_conn = boto3.client('ec2', 'us-west-2') vpc = vpc_conn.create_vpc(CidrBlock='10.0.0.0/16')['Vpc'] subnet = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.1.0/24')['Subnet'] conn = boto3.client('rds', region_name='us-west-2') conn.create_db_subnet_group(DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription='my db subnet', SubnetIds=[subnet['SubnetId']]) conn.create_db_subnet_group(DBSubnetGroupName='db_subnet2', DBSubnetGroupDescription='my db subnet', SubnetIds=[subnet['SubnetId']]) resp = conn.describe_db_subnet_groups() resp['DBSubnetGroups'].should.have.length_of(2) subnets = resp['DBSubnetGroups'][0]['Subnets'] subnets.should.have.length_of(1) list(conn.describe_db_subnet_groups(DBSubnetGroupName="db_subnet1") ['DBSubnetGroups']).should.have.length_of(1) conn.describe_db_subnet_groups.when.called_with( DBSubnetGroupName="not-a-subnet").should.throw(ClientError) @mock_ec2 @mock_rds2 def test_delete_database_subnet_group(): vpc_conn = boto3.client('ec2', 'us-west-2') vpc = vpc_conn.create_vpc(CidrBlock='10.0.0.0/16')['Vpc'] subnet = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.1.0/24')['Subnet'] conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) conn.create_db_subnet_group(DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription='my db subnet', SubnetIds=[subnet['SubnetId']]) result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(1) conn.delete_db_subnet_group(DBSubnetGroupName="db_subnet1") result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) conn.delete_db_subnet_group.when.called_with( DBSubnetGroupName="db_subnet1").should.throw(ClientError) @mock_ec2 @mock_rds2 def test_list_tags_database_subnet_group(): vpc_conn = boto3.client('ec2', 'us-west-2') vpc = vpc_conn.create_vpc(CidrBlock='10.0.0.0/16')['Vpc'] subnet = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.1.0/24')['Subnet'] conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) subnet = conn.create_db_subnet_group(DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription='my db subnet', SubnetIds=[subnet['SubnetId']], Tags=[{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}])['DBSubnetGroup']['DBSubnetGroupName'] result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:subgrp:{0}'.format(subnet)) result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) @mock_ec2 @mock_rds2 def test_add_tags_database_subnet_group(): vpc_conn = boto3.client('ec2', 'us-west-2') vpc = vpc_conn.create_vpc(CidrBlock='10.0.0.0/16')['Vpc'] subnet = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.1.0/24')['Subnet'] conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) subnet = conn.create_db_subnet_group(DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription='my db subnet', SubnetIds=[subnet['SubnetId']], Tags=[])['DBSubnetGroup']['DBSubnetGroupName'] resource = 'arn:aws:rds:us-west-2:1234567890:subgrp:{0}'.format(subnet) conn.add_tags_to_resource(ResourceName=resource, Tags=[{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) result = conn.list_tags_for_resource(ResourceName=resource) result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}]) @mock_ec2 @mock_rds2 def test_remove_tags_database_subnet_group(): vpc_conn = boto3.client('ec2', 'us-west-2') vpc = vpc_conn.create_vpc(CidrBlock='10.0.0.0/16')['Vpc'] subnet = vpc_conn.create_subnet( VpcId=vpc['VpcId'], CidrBlock='10.0.1.0/24')['Subnet'] conn = boto3.client('rds', region_name='us-west-2') result = conn.describe_db_subnet_groups() result['DBSubnetGroups'].should.have.length_of(0) subnet = conn.create_db_subnet_group(DBSubnetGroupName="db_subnet1", DBSubnetGroupDescription='my db subnet', SubnetIds=[subnet['SubnetId']], Tags=[{'Value': 'bar', 'Key': 'foo'}, {'Value': 'bar1', 'Key': 'foo1'}])['DBSubnetGroup']['DBSubnetGroupName'] resource = 'arn:aws:rds:us-west-2:1234567890:subgrp:{0}'.format(subnet) conn.remove_tags_from_resource(ResourceName=resource, TagKeys=['foo']) result = conn.list_tags_for_resource(ResourceName=resource) result['TagList'].should.equal([{'Value': 'bar1', 'Key': 'foo1'}]) @mock_rds2 def test_create_database_replica(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"]) replica = conn.create_db_instance_read_replica(DBInstanceIdentifier="db-replica-1", SourceDBInstanceIdentifier="db-master-1", DBInstanceClass="db.m1.small") replica['DBInstance'][ 'ReadReplicaSourceDBInstanceIdentifier'].should.equal('db-master-1') replica['DBInstance']['DBInstanceClass'].should.equal('db.m1.small') replica['DBInstance']['DBInstanceIdentifier'].should.equal('db-replica-1') master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") master['DBInstances'][0]['ReadReplicaDBInstanceIdentifiers'].should.equal([ 'db-replica-1']) conn.delete_db_instance( DBInstanceIdentifier="db-replica-1", SkipFinalSnapshot=True) master = conn.describe_db_instances(DBInstanceIdentifier="db-master-1") master['DBInstances'][0][ 'ReadReplicaDBInstanceIdentifiers'].should.equal([]) @mock_rds2 @mock_kms def test_create_database_with_encrypted_storage(): kms_conn = boto3.client('kms', region_name='us-west-2') key = kms_conn.create_key(Policy='my RDS encryption policy', Description='RDS encryption key', KeyUsage='ENCRYPT_DECRYPT') conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234, DBSecurityGroups=["my_sg"], StorageEncrypted=True, KmsKeyId=key['KeyMetadata']['KeyId']) database['DBInstance']['StorageEncrypted'].should.equal(True) database['DBInstance']['KmsKeyId'].should.equal( key['KeyMetadata']['KeyId']) @mock_rds2 def test_create_db_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') db_parameter_group = conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group') db_parameter_group['DBParameterGroup'][ 'DBParameterGroupName'].should.equal('test') db_parameter_group['DBParameterGroup'][ 'DBParameterGroupFamily'].should.equal('mysql5.6') db_parameter_group['DBParameterGroup'][ 'Description'].should.equal('test parameter group') @mock_rds2 def test_create_db_instance_with_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') db_parameter_group = conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='mysql', DBInstanceClass='db.m1.small', DBParameterGroupName='test', MasterUsername='root', MasterUserPassword='hunter2', Port=1234) len(database['DBInstance']['DBParameterGroups']).should.equal(1) database['DBInstance']['DBParameterGroups'][0][ 'DBParameterGroupName'].should.equal('test') database['DBInstance']['DBParameterGroups'][0][ 'ParameterApplyStatus'].should.equal('in-sync') @mock_rds2 def test_create_database_with_default_port(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='postgres', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', DBSecurityGroups=["my_sg"]) database['DBInstance']['Endpoint']['Port'].should.equal(5432) @mock_rds2 def test_modify_db_instance_with_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') database = conn.create_db_instance(DBInstanceIdentifier='db-master-1', AllocatedStorage=10, Engine='mysql', DBInstanceClass='db.m1.small', MasterUsername='root', MasterUserPassword='hunter2', Port=1234) len(database['DBInstance']['DBParameterGroups']).should.equal(1) database['DBInstance']['DBParameterGroups'][0][ 'DBParameterGroupName'].should.equal('default.mysql5.6') database['DBInstance']['DBParameterGroups'][0][ 'ParameterApplyStatus'].should.equal('in-sync') db_parameter_group = conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group') conn.modify_db_instance(DBInstanceIdentifier='db-master-1', DBParameterGroupName='test', ApplyImmediately=True) database = conn.describe_db_instances( DBInstanceIdentifier='db-master-1')['DBInstances'][0] len(database['DBParameterGroups']).should.equal(1) database['DBParameterGroups'][0][ 'DBParameterGroupName'].should.equal('test') database['DBParameterGroups'][0][ 'ParameterApplyStatus'].should.equal('in-sync') @mock_rds2 def test_create_db_parameter_group_empty_description(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_parameter_group.when.called_with(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='').should.throw(ClientError) @mock_rds2 def test_create_db_parameter_group_duplicate(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group') conn.create_db_parameter_group.when.called_with(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group').should.throw(ClientError) @mock_rds2 def test_describe_db_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group') db_parameter_groups = conn.describe_db_parameter_groups( DBParameterGroupName='test') db_parameter_groups['DBParameterGroups'][0][ 'DBParameterGroupName'].should.equal('test') @mock_rds2 def test_describe_non_existant_db_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') db_parameter_groups = conn.describe_db_parameter_groups( DBParameterGroupName='test') len(db_parameter_groups['DBParameterGroups']).should.equal(0) @mock_rds2 def test_delete_db_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group') db_parameter_groups = conn.describe_db_parameter_groups( DBParameterGroupName='test') db_parameter_groups['DBParameterGroups'][0][ 'DBParameterGroupName'].should.equal('test') conn.delete_db_parameter_group(DBParameterGroupName='test') db_parameter_groups = conn.describe_db_parameter_groups( DBParameterGroupName='test') len(db_parameter_groups['DBParameterGroups']).should.equal(0) @mock_rds2 def test_modify_db_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group') modify_result = conn.modify_db_parameter_group(DBParameterGroupName='test', Parameters=[{ 'ParameterName': 'foo', 'ParameterValue': 'foo_val', 'Description': 'test param', 'ApplyMethod': 'immediate' }] ) modify_result['DBParameterGroupName'].should.equal('test') db_parameters = conn.describe_db_parameters(DBParameterGroupName='test') db_parameters['Parameters'][0]['ParameterName'].should.equal('foo') db_parameters['Parameters'][0]['ParameterValue'].should.equal('foo_val') db_parameters['Parameters'][0]['Description'].should.equal('test param') db_parameters['Parameters'][0]['ApplyMethod'].should.equal('immediate') @mock_rds2 def test_delete_non_existant_db_parameter_group(): conn = boto3.client('rds', region_name='us-west-2') conn.delete_db_parameter_group.when.called_with( DBParameterGroupName='non-existant').should.throw(ClientError) @mock_rds2 def test_create_parameter_group_with_tags(): conn = boto3.client('rds', region_name='us-west-2') conn.create_db_parameter_group(DBParameterGroupName='test', DBParameterGroupFamily='mysql5.6', Description='test parameter group', Tags=[{ 'Key': 'foo', 'Value': 'bar', }]) result = conn.list_tags_for_resource( ResourceName='arn:aws:rds:us-west-2:1234567890:pg:test') result['TagList'].should.equal([{'Value': 'bar', 'Key': 'foo'}])