Updated delete_cluster() for redshift (#2186)

* Updated the deprecated decorator to allow the "SkipFinalClusterSnapshot" option that aws supports.

* FIxed logical mistake on the delete_cluster

* Removed an unused exception I put in
This commit is contained in:
redspart 2019-05-20 18:56:23 -04:00 committed by Terry Cain
parent 8cb4db1896
commit 1fd71fd45a
3 changed files with 50 additions and 11 deletions

View File

@ -531,14 +531,37 @@ class RedshiftBackend(BaseBackend):
setattr(cluster, key, value)
if new_cluster_identifier:
self.delete_cluster(cluster_identifier)
dic = {
"cluster_identifier": cluster_identifier,
"skip_final_snapshot": True,
"final_cluster_snapshot_identifier": None
}
self.delete_cluster(**dic)
cluster.cluster_identifier = new_cluster_identifier
self.clusters[new_cluster_identifier] = cluster
return cluster
def delete_cluster(self, cluster_identifier):
def delete_cluster(self, **cluster_kwargs):
cluster_identifier = cluster_kwargs.pop("cluster_identifier")
cluster_skip_final_snapshot = cluster_kwargs.pop("skip_final_snapshot")
cluster_snapshot_identifer = cluster_kwargs.pop("final_cluster_snapshot_identifier")
if cluster_identifier in self.clusters:
if cluster_skip_final_snapshot is False and cluster_snapshot_identifer is None:
raise ClientError(
"InvalidParameterValue",
'FinalSnapshotIdentifier is required for Snapshot copy '
'when SkipFinalSnapshot is False'
)
elif cluster_skip_final_snapshot is False and cluster_snapshot_identifer is not None: # create snapshot
cluster = self.describe_clusters(cluster_identifier)[0]
self.create_cluster_snapshot(
cluster_identifier,
cluster_snapshot_identifer,
cluster.region,
cluster.tags)
return self.clusters.pop(cluster_identifier)
raise ClusterNotFoundError(cluster_identifier)

View File

@ -240,8 +240,13 @@ class RedshiftResponse(BaseResponse):
})
def delete_cluster(self):
cluster_identifier = self._get_param("ClusterIdentifier")
cluster = self.redshift_backend.delete_cluster(cluster_identifier)
request_kwargs = {
"cluster_identifier": self._get_param("ClusterIdentifier"),
"final_cluster_snapshot_identifier": self._get_param("FinalClusterSnapshotIdentifier"),
"skip_final_snapshot": self._get_bool_param("SkipFinalClusterSnapshot")
}
cluster = self.redshift_backend.delete_cluster(**request_kwargs)
return self.get_response({
"DeleteClusterResponse": {

View File

@ -9,7 +9,7 @@ from boto.redshift.exceptions import (
ClusterParameterGroupNotFound,
ClusterSecurityGroupNotFound,
ClusterSubnetGroupNotFound,
InvalidSubnet,
InvalidSubnet
)
from botocore.exceptions import (
ClientError
@ -339,7 +339,7 @@ def test_create_cluster_with_vpc_security_groups_boto3():
@mock_redshift
def test_create_cluster_with_iam_roles():
iam_roles_arn = ['arn:aws:iam:::role/my-iam-role',]
iam_roles_arn = ['arn:aws:iam:::role/my-iam-role', ]
client = boto3.client('redshift', region_name='us-east-1')
cluster_id = 'my_cluster'
client.create_cluster(
@ -385,29 +385,41 @@ def test_describe_non_existent_cluster():
conn.describe_clusters.when.called_with(
"not-a-cluster").should.throw(ClusterNotFound)
@mock_redshift_deprecated
def test_delete_cluster():
conn = boto.connect_redshift()
cluster_identifier = 'my_cluster'
cluster_identifier = "my_cluster"
snapshot_identifier = "my_snapshot"
conn.create_cluster(
cluster_identifier,
node_type='single-node',
node_type="single-node",
master_username="username",
master_user_password="password",
)
conn.delete_cluster.when.called_with(cluster_identifier, False).should.throw(AttributeError)
clusters = conn.describe_clusters()['DescribeClustersResponse'][
'DescribeClustersResult']['Clusters']
list(clusters).should.have.length_of(1)
conn.delete_cluster(cluster_identifier)
conn.delete_cluster(
cluster_identifier=cluster_identifier,
skip_final_cluster_snapshot=False,
final_cluster_snapshot_identifier=snapshot_identifier
)
clusters = conn.describe_clusters()['DescribeClustersResponse'][
'DescribeClustersResult']['Clusters']
list(clusters).should.have.length_of(0)
snapshots = conn.describe_cluster_snapshots()["DescribeClusterSnapshotsResponse"][
"DescribeClusterSnapshotsResult"]["Snapshots"]
list(snapshots).should.have.length_of(1)
assert snapshot_identifier in snapshots[0]["SnapshotIdentifier"]
# Delete invalid id
conn.delete_cluster.when.called_with(
"not-a-cluster").should.throw(ClusterNotFound)
@ -643,7 +655,6 @@ def test_delete_cluster_parameter_group():
"not-a-parameter-group").should.throw(ClusterParameterGroupNotFound)
@mock_redshift
def test_create_cluster_snapshot_of_non_existent_cluster():
client = boto3.client('redshift', region_name='us-east-1')