These changes address issue 4834. (#4835)

This commit is contained in:
mgshirali 2022-02-07 20:40:12 +05:30 committed by GitHub
parent 500ed1a90b
commit 60e26e5892
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 253 additions and 12 deletions

View File

@ -155,3 +155,13 @@ class ClusterSecurityGroupNotFoundFaultError(RedshiftClientError):
"ClusterSecurityGroupNotFoundFault",
"The cluster security group name does not refer to an existing cluster security group.",
)
class InvalidClusterSnapshotStateFaultError(RedshiftClientError):
def __init__(self, snapshot_identifier):
super().__init__(
"InvalidClusterSnapshotStateFault",
"Cannot delete the snapshot {0} because only manual snapshots may be deleted".format(
snapshot_identifier
),
)

View File

@ -25,6 +25,7 @@ from .exceptions import (
SnapshotCopyGrantNotFoundFaultError,
UnknownSnapshotCopyRegionFaultError,
ClusterSecurityGroupNotFoundFaultError,
InvalidClusterSnapshotStateFaultError,
)
@ -505,12 +506,18 @@ class Snapshot(TaggableResourceMixin, BaseModel):
resource_type = "snapshot"
def __init__(
self, cluster, snapshot_identifier, region_name, tags=None, iam_roles_arn=None
self,
cluster,
snapshot_identifier,
region_name,
tags=None,
iam_roles_arn=None,
snapshot_type="manual",
):
super().__init__(region_name, tags)
self.cluster = copy.copy(cluster)
self.snapshot_identifier = snapshot_identifier
self.snapshot_type = "manual"
self.snapshot_type = snapshot_type
self.status = "available"
self.create_time = iso_8601_datetime_with_milliseconds(datetime.datetime.now())
self.iam_roles_arn = iam_roles_arn or []
@ -638,6 +645,17 @@ class RedshiftBackend(BaseBackend):
raise ClusterAlreadyExistsFaultError()
cluster = Cluster(self, **cluster_kwargs)
self.clusters[cluster_identifier] = cluster
snapshot_id = "rs:{}-{}".format(
cluster_identifier, datetime.datetime.utcnow().strftime("%Y-%m-%d-%H-%M")
)
# Automated snapshots don't copy over the tags
self.create_cluster_snapshot(
cluster_identifier,
snapshot_id,
cluster.region,
None,
snapshot_type="automated",
)
return cluster
def pause_cluster(self, cluster_id):
@ -696,6 +714,14 @@ class RedshiftBackend(BaseBackend):
return cluster
def delete_automated_snapshots(self, cluster_identifier):
snapshots = self.describe_cluster_snapshots(
cluster_identifier=cluster_identifier
)
for snapshot in snapshots:
if snapshot.snapshot_type == "automated":
self.snapshots.pop(snapshot.snapshot_identifier)
def delete_cluster(self, **cluster_kwargs):
cluster_identifier = cluster_kwargs.pop("cluster_identifier")
cluster_skip_final_snapshot = cluster_kwargs.pop("skip_final_snapshot")
@ -722,7 +748,7 @@ class RedshiftBackend(BaseBackend):
cluster.region,
cluster.tags,
)
self.delete_automated_snapshots(cluster_identifier)
return self.clusters.pop(cluster_identifier)
raise ClusterNotFoundError(cluster_identifier)
@ -817,31 +843,43 @@ class RedshiftBackend(BaseBackend):
raise ClusterParameterGroupNotFoundError(parameter_group_name)
def create_cluster_snapshot(
self, cluster_identifier, snapshot_identifier, region_name, tags
self,
cluster_identifier,
snapshot_identifier,
region_name,
tags,
snapshot_type="manual",
):
cluster = self.clusters.get(cluster_identifier)
if not cluster:
raise ClusterNotFoundError(cluster_identifier)
if self.snapshots.get(snapshot_identifier) is not None:
raise ClusterSnapshotAlreadyExistsError(snapshot_identifier)
snapshot = Snapshot(cluster, snapshot_identifier, region_name, tags)
snapshot = Snapshot(
cluster, snapshot_identifier, region_name, tags, snapshot_type=snapshot_type
)
self.snapshots[snapshot_identifier] = snapshot
return snapshot
def describe_cluster_snapshots(
self, cluster_identifier=None, snapshot_identifier=None
self, cluster_identifier=None, snapshot_identifier=None, snapshot_type=None
):
snapshot_types = (
["automated", "manual"] if snapshot_type is None else [snapshot_type]
)
if cluster_identifier:
cluster_snapshots = []
for snapshot in self.snapshots.values():
if snapshot.cluster.cluster_identifier == cluster_identifier:
cluster_snapshots.append(snapshot)
if snapshot.snapshot_type in snapshot_types:
cluster_snapshots.append(snapshot)
if cluster_snapshots:
return cluster_snapshots
if snapshot_identifier:
if snapshot_identifier in self.snapshots:
return [self.snapshots[snapshot_identifier]]
if self.snapshots[snapshot_identifier].snapshot_type in snapshot_types:
return [self.snapshots[snapshot_identifier]]
raise ClusterSnapshotNotFoundError(snapshot_identifier)
return self.snapshots.values()
@ -850,6 +888,11 @@ class RedshiftBackend(BaseBackend):
if snapshot_identifier not in self.snapshots:
raise ClusterSnapshotNotFoundError(snapshot_identifier)
snapshot = self.describe_cluster_snapshots(
snapshot_identifier=snapshot_identifier
)[0]
if snapshot.snapshot_type == "automated":
raise InvalidClusterSnapshotStateFaultError(snapshot_identifier)
deleted_snapshot = self.snapshots.pop(snapshot_identifier)
deleted_snapshot.status = "deleted"
return deleted_snapshot

View File

@ -554,8 +554,9 @@ class RedshiftResponse(BaseResponse):
def describe_cluster_snapshots(self):
cluster_identifier = self._get_param("ClusterIdentifier")
snapshot_identifier = self._get_param("SnapshotIdentifier")
snapshot_type = self._get_param("SnapshotType")
snapshots = self.redshift_backend.describe_cluster_snapshots(
cluster_identifier, snapshot_identifier
cluster_identifier, snapshot_identifier, snapshot_type
)
return self.get_response(
{

View File

@ -798,6 +798,191 @@ def test_create_cluster_snapshot_of_non_existent_cluster():
).should.throw(ClientError, "Cluster {} not found.".format(cluster_identifier))
@mock_redshift
def test_automated_snapshot_on_cluster_creation():
client = boto3.client("redshift", region_name="us-east-1")
cluster_identifier = "my_cluster"
cluster_response = client.create_cluster(
DBName="test-db",
ClusterIdentifier=cluster_identifier,
ClusterType="single-node",
NodeType="ds2.xlarge",
MasterUsername="username",
MasterUserPassword="password",
EnhancedVpcRouting=True,
Tags=[{"Key": "tag_key", "Value": "tag_value"}],
)
cluster_response["Cluster"]["Tags"].should.equal(
[{"Key": "tag_key", "Value": "tag_value"}]
)
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier
)
resp_auto_snap["Snapshots"][0]["SnapshotType"].should.equal("automated")
# Tags from cluster are not copied over to automated snapshot
resp_auto_snap["Snapshots"][0]["Tags"].should.equal([])
@mock_redshift
def test_delete_automated_snapshot():
client = boto3.client("redshift", region_name="us-east-1")
cluster_identifier = "my_cluster"
cluster_response = client.create_cluster(
DBName="test-db",
ClusterIdentifier=cluster_identifier,
ClusterType="single-node",
NodeType="ds2.xlarge",
MasterUsername="username",
MasterUserPassword="password",
EnhancedVpcRouting=True,
)
cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge")
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier
)
snapshot_identifier = resp_auto_snap["Snapshots"][0]["SnapshotIdentifier"]
# Delete automated snapshot should result in error
client.delete_cluster_snapshot.when.called_with(
SnapshotIdentifier=snapshot_identifier
).should.throw(
ClientError,
"Cannot delete the snapshot {0} because only manual snapshots may be deleted".format(
snapshot_identifier
),
)
@mock_redshift
def test_presence_automated_snapshot_on_cluster_delete():
client = boto3.client("redshift", region_name="us-east-1")
cluster_identifier = "my_cluster"
client.create_cluster(
ClusterIdentifier=cluster_identifier,
ClusterType="single-node",
NodeType="ds2.xlarge",
MasterUsername="username",
MasterUserPassword="password",
)
# Ensure automated snapshot is available
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
resp["Snapshots"].should.have.length_of(1)
# Delete the cluster
cluster_response = client.delete_cluster(
ClusterIdentifier=cluster_identifier, SkipFinalClusterSnapshot=True
)
cluster = cluster_response["Cluster"]
cluster["ClusterIdentifier"].should.equal(cluster_identifier)
# Ensure Automated snapshot is deleted
resp = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
resp["Snapshots"].should.have.length_of(0)
@mock_redshift
def test_describe_snapshot_with_filter():
client = boto3.client("redshift", region_name="us-east-1")
cluster_identifier = "my_cluster"
snapshot_identifier = "my_snapshot"
cluster_response = client.create_cluster(
DBName="test-db",
ClusterIdentifier=cluster_identifier,
ClusterType="single-node",
NodeType="ds2.xlarge",
MasterUsername="username",
MasterUserPassword="password",
EnhancedVpcRouting=True,
)
cluster_response["Cluster"]["NodeType"].should.equal("ds2.xlarge")
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="automated"
)
auto_snapshot_identifier = resp_auto_snap["Snapshots"][0]["SnapshotIdentifier"]
client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier, ClusterIdentifier=cluster_identifier,
)
resp = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="automated"
)
resp["Snapshots"].should.have.length_of(1)
resp = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="manual"
)
resp["Snapshots"].should.have.length_of(1)
resp = client.describe_cluster_snapshots(
SnapshotIdentifier=snapshot_identifier, SnapshotType="manual"
)
resp["Snapshots"].should.have.length_of(1)
resp = client.describe_cluster_snapshots(
SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="automated"
)
resp["Snapshots"].should.have.length_of(1)
client.describe_cluster_snapshots.when.called_with(
SnapshotIdentifier=snapshot_identifier, SnapshotType="automated"
).should.throw(ClientError, "Snapshot {0} not found.".format(snapshot_identifier))
client.describe_cluster_snapshots.when.called_with(
SnapshotIdentifier=auto_snapshot_identifier, SnapshotType="manual"
).should.throw(
ClientError, "Snapshot {0} not found.".format(auto_snapshot_identifier)
)
@mock_redshift
def test_create_cluster_from_automated_snapshot():
client = boto3.client("redshift", region_name="us-east-1")
original_cluster_identifier = "original-cluster"
new_cluster_identifier = "new-cluster"
client.create_cluster(
ClusterIdentifier=original_cluster_identifier,
ClusterType="single-node",
NodeType="ds2.xlarge",
MasterUsername="username",
MasterUserPassword="password",
EnhancedVpcRouting=True,
)
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=original_cluster_identifier, SnapshotType="automated"
)
auto_snapshot_identifier = resp_auto_snap["Snapshots"][0]["SnapshotIdentifier"]
client.restore_from_cluster_snapshot.when.called_with(
ClusterIdentifier=original_cluster_identifier,
SnapshotIdentifier=auto_snapshot_identifier,
).should.throw(ClientError, "ClusterAlreadyExists")
response = client.restore_from_cluster_snapshot(
ClusterIdentifier=new_cluster_identifier,
SnapshotIdentifier=auto_snapshot_identifier,
Port=1234,
)
response["Cluster"]["ClusterStatus"].should.equal("creating")
response = client.describe_clusters(ClusterIdentifier=new_cluster_identifier)
new_cluster = response["Clusters"][0]
new_cluster["NodeType"].should.equal("ds2.xlarge")
new_cluster["MasterUsername"].should.equal("username")
new_cluster["Endpoint"]["Port"].should.equal(1234)
new_cluster["EnhancedVpcRouting"].should.equal(True)
# Make sure the new cluster has automated snapshot on cluster creation
resp_auto_snap = client.describe_cluster_snapshots(
ClusterIdentifier=new_cluster_identifier, SnapshotType="automated"
)
resp_auto_snap["Snapshots"].should.have.length_of(1)
@mock_redshift
def test_create_cluster_snapshot():
client = boto3.client("redshift", region_name="us-east-1")
@ -871,7 +1056,9 @@ def test_describe_cluster_snapshots():
snapshot_2["NodeType"].should.equal("ds2.xlarge")
snapshot_2["MasterUsername"].should.equal("username")
resp_clust = client.describe_cluster_snapshots(ClusterIdentifier=cluster_identifier)
resp_clust = client.describe_cluster_snapshots(
ClusterIdentifier=cluster_identifier, SnapshotType="manual"
)
resp_clust["Snapshots"][0].should.equal(resp_snap_1["Snapshots"][0])
resp_clust["Snapshots"][1].should.equal(resp_snap_2["Snapshots"][0])
@ -908,14 +1095,14 @@ def test_delete_cluster_snapshot():
)
snapshots = client.describe_cluster_snapshots()["Snapshots"]
list(snapshots).should.have.length_of(1)
list(snapshots).should.have.length_of(2)
client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)["Snapshot"][
"Status"
].should.equal("deleted")
snapshots = client.describe_cluster_snapshots()["Snapshots"]
list(snapshots).should.have.length_of(0)
list(snapshots).should.have.length_of(1)
# Delete invalid id
client.delete_cluster_snapshot.when.called_with(