Fix: DeleteCluster behavior with SkipFinalClusterSnapshot

Original code was trying to raise a ClientError directly.  Change to
appropriate Redshift exception class.

* Fix test assertion for `boto`.
* Add test coverage for `boto3`.
This commit is contained in:
Brian Pandola 2020-11-21 23:21:15 -08:00 committed by Bert Blommers
parent 555be78f6e
commit 49c6e65603
3 changed files with 83 additions and 6 deletions

View File

@ -143,3 +143,10 @@ class ClusterAlreadyExistsFaultError(RedshiftClientError):
super(ClusterAlreadyExistsFaultError, self).__init__(
"ClusterAlreadyExists", "Cluster already exists"
)
class InvalidParameterCombinationError(RedshiftClientError):
def __init__(self, message):
super(InvalidParameterCombinationError, self).__init__(
"InvalidParameterCombination", message
)

View File

@ -4,7 +4,7 @@ import copy
import datetime
from boto3 import Session
from botocore.exceptions import ClientError
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel, CloudFormationModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
@ -17,6 +17,7 @@ from .exceptions import (
ClusterSnapshotAlreadyExistsError,
ClusterSnapshotNotFoundError,
ClusterSubnetGroupNotFoundError,
InvalidParameterCombinationError,
InvalidParameterValueError,
InvalidSubnetError,
ResourceNotFoundFaultError,
@ -655,10 +656,8 @@ class RedshiftBackend(BaseBackend):
cluster_skip_final_snapshot is False
and cluster_snapshot_identifer is None
):
raise ClientError(
"InvalidParameterValue",
"FinalSnapshotIdentifier is required for Snapshot copy "
"when SkipFinalSnapshot is False",
raise InvalidParameterCombinationError(
"FinalClusterSnapshotIdentifier is required unless SkipFinalClusterSnapshot is specified."
)
elif (
cluster_skip_final_snapshot is False

View File

@ -424,7 +424,7 @@ def test_delete_cluster():
)
conn.delete_cluster.when.called_with(cluster_identifier, False).should.throw(
AttributeError
boto.exception.JSONResponseError
)
clusters = conn.describe_clusters()["DescribeClustersResponse"][
@ -1363,3 +1363,74 @@ def test_create_duplicate_cluster_fails():
client.create_cluster.when.called_with(**kwargs).should.throw(
ClientError, "ClusterAlreadyExists"
)
@mock_redshift
def test_delete_cluster_with_final_snapshot():
client = boto3.client("redshift", region_name="us-east-1")
with pytest.raises(ClientError) as ex:
client.delete_cluster(ClusterIdentifier="non-existent")
ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
cluster_identifier = "my_cluster"
client.create_cluster(
ClusterIdentifier=cluster_identifier,
ClusterType="single-node",
DBName="test",
MasterUsername="user",
MasterUserPassword="password",
NodeType="ds2.xlarge",
)
with pytest.raises(ClientError) as ex:
client.delete_cluster(
ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=False
)
ex.value.response["Error"]["Code"].should.equal("InvalidParameterCombination")
ex.value.response["Error"]["Message"].should.contain(
"FinalClusterSnapshotIdentifier is required unless SkipFinalClusterSnapshot is specified."
)
snapshot_identifier = "my_snapshot"
client.delete_cluster(
ClusterIdentifier=cluster_identifier,
SkipFinalClusterSnapshot=False,
FinalClusterSnapshotIdentifier=snapshot_identifier,
)
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
resp["Snapshots"].should.have.length_of(1)
resp["Snapshots"][0]["SnapshotIdentifier"].should.equal(snapshot_identifier)
resp["Snapshots"][0]["SnapshotType"].should.equal("manual")
with pytest.raises(ClientError) as ex:
client.describe_clusters(ClusterIdentifier=cluster_identifier)
ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")
@mock_redshift
def test_delete_cluster_without_final_snapshot():
client = boto3.client("redshift", region_name="us-east-1")
cluster_identifier = "my_cluster"
client.create_cluster(
ClusterIdentifier=cluster_identifier,
ClusterType="single-node",
DBName="test",
MasterUsername="user",
MasterUserPassword="password",
NodeType="ds2.xlarge",
)
client.delete_cluster(
ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True
)
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
resp["Snapshots"].should.have.length_of(0)
with pytest.raises(ClientError) as ex:
client.describe_clusters(ClusterIdentifier=cluster_identifier)
ex.value.response["Error"]["Code"].should.equal("ClusterNotFound")
ex.value.response["Error"]["Message"].should.match(r"Cluster .+ not found.")