From 0a03a7237e50c9867bbe2c7f7bc19565ec75065d Mon Sep 17 00:00:00 2001 From: Brian Pandola Date: Wed, 9 Aug 2017 18:43:21 -0700 Subject: [PATCH] Redshift Updates - Implement create_cluster_snapshot endpoint - Implement describe_cluster_snapshots endpoint - Implement delete_cluster_snapshot endpoint - Implement restore_from_cluster_snapshot endpoint - Implement limited support for describe_tags endpoint - Correctly serialize errors to json (for boto) or xml (for boto3) - Simulate cluster spin up by returning initial status as 'creating' and subsequent statuses as 'available' - Fix issue with modify_cluster endpoint where cluster values get set to None when omitted from request - Add 'Endpoint' key to describe_clusters response syntax --- moto/redshift/exceptions.py | 15 +++ moto/redshift/models.py | 102 ++++++++++++++- moto/redshift/responses.py | 180 +++++++++++++++++++++++++- tests/test_redshift/test_redshift.py | 185 ++++++++++++++++++++++++++- 4 files changed, 474 insertions(+), 8 deletions(-) diff --git a/moto/redshift/exceptions.py b/moto/redshift/exceptions.py index 8bcca807e..877e850e4 100644 --- a/moto/redshift/exceptions.py +++ b/moto/redshift/exceptions.py @@ -56,3 +56,18 @@ class InvalidSubnetError(RedshiftClientError): super(InvalidSubnetError, self).__init__( 'InvalidSubnet', "Subnet {0} not found.".format(subnet_identifier)) + + +class ClusterSnapshotNotFoundError(RedshiftClientError): + def __init__(self, snapshot_identifier): + super(ClusterSnapshotNotFoundError, self).__init__( + 'ClusterSnapshotNotFound', + "Snapshot {0} not found.".format(snapshot_identifier)) + + +class ClusterSnapshotAlreadyExistsError(RedshiftClientError): + def __init__(self, snapshot_identifier): + super(ClusterSnapshotAlreadyExistsError, self).__init__( + 'ClusterSnapshotAlreadyExists', + "Cannot create the snapshot because a snapshot with the " + "identifier {0} already exists".format(snapshot_identifier)) diff --git a/moto/redshift/models.py b/moto/redshift/models.py index 5e64f7a16..29c802fb0 100644 --- a/moto/redshift/models.py +++ b/moto/redshift/models.py @@ -1,12 +1,19 @@ from __future__ import unicode_literals +import copy +import datetime + import boto.redshift +from moto.compat import OrderedDict from moto.core import BaseBackend, BaseModel +from moto.core.utils import iso_8601_datetime_with_milliseconds from moto.ec2 import ec2_backends from .exceptions import ( ClusterNotFoundError, ClusterParameterGroupNotFoundError, ClusterSecurityGroupNotFoundError, + ClusterSnapshotAlreadyExistsError, + ClusterSnapshotNotFoundError, ClusterSubnetGroupNotFoundError, InvalidSubnetError, ) @@ -23,6 +30,7 @@ class Cluster(BaseModel): encrypted, region): self.redshift_backend = redshift_backend self.cluster_identifier = cluster_identifier + self.status = 'available' self.node_type = node_type self.master_username = master_username self.master_user_password = master_user_password @@ -152,7 +160,7 @@ class Cluster(BaseModel): } for group in self.vpc_security_groups], "ClusterSubnetGroupName": self.cluster_subnet_group_name, "AvailabilityZone": self.availability_zone, - "ClusterStatus": "creating", + "ClusterStatus": self.status, "NumberOfNodes": self.number_of_nodes, "AutomatedSnapshotRetentionPeriod": self.automated_snapshot_retention_period, "PubliclyAccessible": self.publicly_accessible, @@ -171,6 +179,13 @@ class Cluster(BaseModel): "NodeType": self.node_type, "ClusterIdentifier": self.cluster_identifier, "AllowVersionUpgrade": self.allow_version_upgrade, + "Endpoint": { + "Address": '{}.{}.redshift.amazonaws.com'.format( + self.cluster_identifier, + self.region), + "Port": self.port + }, + "PendingModifiedValues": [] } @@ -262,6 +277,42 @@ class ParameterGroup(BaseModel): } +class Snapshot(BaseModel): + + def __init__(self, cluster, snapshot_identifier, tags=None): + self.cluster = copy.copy(cluster) + self.snapshot_identifier = snapshot_identifier + self.snapshot_type = 'manual' + self.status = 'available' + self.tags = tags or [] + self.create_time = iso_8601_datetime_with_milliseconds( + datetime.datetime.now()) + + @property + def arn(self): + return "arn:aws:redshift:{0}:1234567890:snapshot:{1}/{2}".format( + self.cluster.region, + self.cluster.cluster_identifier, + self.snapshot_identifier) + + def to_json(self): + return { + 'SnapshotIdentifier': self.snapshot_identifier, + 'ClusterIdentifier': self.cluster.cluster_identifier, + 'SnapshotCreateTime': self.create_time, + 'Status': self.status, + 'Port': self.cluster.port, + 'AvailabilityZone': self.cluster.availability_zone, + 'MasterUsername': self.cluster.master_username, + 'ClusterVersion': self.cluster.cluster_version, + 'SnapshotType': self.snapshot_type, + 'NodeType': self.cluster.node_type, + 'NumberOfNodes': self.cluster.number_of_nodes, + 'DBName': self.cluster.db_name, + 'Tags': self.tags + } + + class RedshiftBackend(BaseBackend): def __init__(self, ec2_backend): @@ -278,6 +329,7 @@ class RedshiftBackend(BaseBackend): ) } self.ec2_backend = ec2_backend + self.snapshots = OrderedDict() def reset(self): ec2_backend = self.ec2_backend @@ -383,6 +435,54 @@ class RedshiftBackend(BaseBackend): return self.parameter_groups.pop(parameter_group_name) raise ClusterParameterGroupNotFoundError(parameter_group_name) + def create_snapshot(self, cluster_identifier, snapshot_identifier, tags): + cluster = self.clusters.get(cluster_identifier) + if not cluster: + raise ClusterNotFoundError(cluster_identifier) + if self.snapshots.get(snapshot_identifier) is not None: + raise ClusterSnapshotAlreadyExistsError(snapshot_identifier) + snapshot = Snapshot(cluster, snapshot_identifier, tags) + self.snapshots[snapshot_identifier] = snapshot + return snapshot + + def describe_snapshots(self, cluster_identifier, snapshot_identifier): + if cluster_identifier: + for snapshot in self.snapshots.values(): + if snapshot.cluster.cluster_identifier == cluster_identifier: + return [snapshot] + raise ClusterNotFoundError(cluster_identifier) + + if snapshot_identifier: + if snapshot_identifier in self.snapshots: + return [self.snapshots[snapshot_identifier]] + raise ClusterSnapshotNotFoundError(snapshot_identifier) + + return self.snapshots.values() + + def delete_snapshot(self, snapshot_identifier): + if snapshot_identifier not in self.snapshots: + raise ClusterSnapshotNotFoundError(snapshot_identifier) + + deleted_snapshot = self.snapshots.pop(snapshot_identifier) + deleted_snapshot.status = 'deleted' + return deleted_snapshot + + def describe_tags_for_resource_type(self, resource_type): + tagged_resources = [] + if resource_type == 'Snapshot': + for snapshot in self.snapshots.values(): + for tag in snapshot.tags: + data = { + 'ResourceName': snapshot.arn, + 'ResourceType': 'snapshot', + 'Tag': { + 'Key': tag['Key'], + 'Value': tag['Value'] + } + } + tagged_resources.append(data) + return tagged_resources + redshift_backends = {} for region in boto.redshift.regions(): diff --git a/moto/redshift/responses.py b/moto/redshift/responses.py index 48f113cf2..411569d01 100644 --- a/moto/redshift/responses.py +++ b/moto/redshift/responses.py @@ -1,12 +1,31 @@ from __future__ import unicode_literals import json + import dicttoxml +from jinja2 import Template +from six import iteritems from moto.core.responses import BaseResponse from .models import redshift_backends +def convert_json_error_to_xml(json_error): + error = json.loads(json_error) + code = error['Error']['Code'] + message = error['Error']['Message'] + template = Template(""" + + + {{ code }} + {{ message }} + Sender + + 6876f774-7273-11e4-85dc-39e55ca848d1 + """) + return template.render(code=code, message=message) + + class RedshiftResponse(BaseResponse): @property @@ -20,6 +39,24 @@ class RedshiftResponse(BaseResponse): xml = dicttoxml.dicttoxml(response, attr_type=False, root=False) return xml.decode("utf-8") + def call_action(self): + status, headers, body = super(RedshiftResponse, self).call_action() + if status >= 400 and not self.request_json: + body = convert_json_error_to_xml(body) + return status, headers, body + + def unpack_complex_list_params(self, label, names): + unpacked_list = list() + count = 1 + while self._get_param('{0}.{1}.{2}'.format(label, count, names[0])): + param = dict() + for i in range(len(names)): + param[names[i]] = self._get_param( + '{0}.{1}.{2}'.format(label, count, names[i])) + unpacked_list.append(param) + count += 1 + return unpacked_list + def create_cluster(self): cluster_kwargs = { "cluster_identifier": self._get_param('ClusterIdentifier'), @@ -43,12 +80,66 @@ class RedshiftResponse(BaseResponse): "encrypted": self._get_param("Encrypted"), "region": self.region, } - cluster = self.redshift_backend.create_cluster(**cluster_kwargs) - + cluster = self.redshift_backend.create_cluster(**cluster_kwargs).to_json() + cluster['ClusterStatus'] = 'creating' return self.get_response({ "CreateClusterResponse": { "CreateClusterResult": { - "Cluster": cluster.to_json(), + "Cluster": cluster, + }, + "ResponseMetadata": { + "RequestId": "384ac68d-3775-11df-8963-01868b7c937a", + } + } + }) + + def restore_from_cluster_snapshot(self): + snapshot_identifier = self._get_param('SnapshotIdentifier') + snapshots = self.redshift_backend.describe_snapshots( + None, + snapshot_identifier) + snapshot = snapshots[0] + kwargs_from_snapshot = { + "node_type": snapshot.cluster.node_type, + "master_username": snapshot.cluster.master_username, + "master_user_password": snapshot.cluster.master_user_password, + "db_name": snapshot.cluster.db_name, + "cluster_type": 'multi-node' if snapshot.cluster.number_of_nodes > 1 else 'single-node', + "availability_zone": snapshot.cluster.availability_zone, + "port": snapshot.cluster.port, + "cluster_version": snapshot.cluster.cluster_version, + "number_of_nodes": snapshot.cluster.number_of_nodes, + } + kwargs_from_request = { + "cluster_identifier": self._get_param('ClusterIdentifier'), + "port": self._get_int_param('Port'), + "availability_zone": self._get_param('AvailabilityZone'), + "allow_version_upgrade": self._get_bool_param( + 'AllowVersionUpgrade'), + "cluster_subnet_group_name": self._get_param( + 'ClusterSubnetGroupName'), + "publicly_accessible": self._get_param("PubliclyAccessible"), + "cluster_parameter_group_name": self._get_param( + 'ClusterParameterGroupName'), + "cluster_security_groups": self._get_multi_param( + 'ClusterSecurityGroups.member'), + "vpc_security_group_ids": self._get_multi_param( + 'VpcSecurityGroupIds.member'), + "preferred_maintenance_window": self._get_param( + 'PreferredMaintenanceWindow'), + "automated_snapshot_retention_period": self._get_int_param( + 'AutomatedSnapshotRetentionPeriod'), + "region": self.region, + "encrypted": False, + } + kwargs_from_snapshot.update(kwargs_from_request) + cluster_kwargs = kwargs_from_snapshot + cluster = self.redshift_backend.create_cluster(**cluster_kwargs).to_json() + cluster['ClusterStatus'] = 'creating' + return self.get_response({ + "RestoreFromClusterSnapshotResponse": { + "RestoreFromClusterSnapshotResult": { + "Cluster": cluster, }, "ResponseMetadata": { "RequestId": "384ac68d-3775-11df-8963-01868b7c937a", @@ -72,7 +163,7 @@ class RedshiftResponse(BaseResponse): }) def modify_cluster(self): - cluster_kwargs = { + request_kwargs = { "cluster_identifier": self._get_param('ClusterIdentifier'), "new_cluster_identifier": self._get_param('NewClusterIdentifier'), "node_type": self._get_param('NodeType'), @@ -90,6 +181,19 @@ class RedshiftResponse(BaseResponse): "publicly_accessible": self._get_param("PubliclyAccessible"), "encrypted": self._get_param("Encrypted"), } + # There's a bug in boto3 where the security group ids are not passed + # according to the AWS documentation + if not request_kwargs['vpc_security_group_ids']: + request_kwargs['vpc_security_group_ids'] = self._get_multi_param( + 'VpcSecurityGroupIds.VpcSecurityGroupId') + + cluster_kwargs = {} + # We only want parameters that were actually passed in, otherwise + # we'll stomp all over our cluster metadata with None values. + for (key, value) in iteritems(request_kwargs): + if value is not None and value != []: + cluster_kwargs[key] = value + cluster = self.redshift_backend.modify_cluster(**cluster_kwargs) return self.get_response({ @@ -273,3 +377,71 @@ class RedshiftResponse(BaseResponse): } } }) + + def create_cluster_snapshot(self): + cluster_identifier = self._get_param('ClusterIdentifier') + snapshot_identifier = self._get_param('SnapshotIdentifier') + tags = self.unpack_complex_list_params( + 'Tags.Tag', ('Key', 'Value')) + snapshot = self.redshift_backend.create_snapshot(cluster_identifier, + snapshot_identifier, + tags) + return self.get_response({ + 'CreateClusterSnapshotResponse': { + "CreateClusterSnapshotResult": { + "Snapshot": snapshot.to_json(), + }, + "ResponseMetadata": { + "RequestId": "384ac68d-3775-11df-8963-01868b7c937a", + } + } + }) + + def describe_cluster_snapshots(self): + cluster_identifier = self._get_param('ClusterIdentifier') + snapshot_identifier = self._get_param('DBSnapshotIdentifier') + snapshots = self.redshift_backend.describe_snapshots(cluster_identifier, + snapshot_identifier) + return self.get_response({ + "DescribeClusterSnapshotsResponse": { + "DescribeClusterSnapshotsResult": { + "Snapshots": [snapshot.to_json() for snapshot in snapshots] + }, + "ResponseMetadata": { + "RequestId": "384ac68d-3775-11df-8963-01868b7c937a", + } + } + }) + + def delete_cluster_snapshot(self): + snapshot_identifier = self._get_param('SnapshotIdentifier') + snapshot = self.redshift_backend.delete_snapshot(snapshot_identifier) + + return self.get_response({ + "DeleteClusterSnapshotResponse": { + "DeleteClusterSnapshotResult": { + "Snapshot": snapshot.to_json() + }, + "ResponseMetadata": { + "RequestId": "384ac68d-3775-11df-8963-01868b7c937a", + } + } + }) + + def describe_tags(self): + resource_type = self._get_param('ResourceType') + if resource_type != 'Snapshot': + raise NotImplementedError( + "The describe_tags action has not been fully implemented.") + tagged_resources = \ + self.redshift_backend.describe_tags_for_resource_type(resource_type) + return self.get_response({ + "DescribeTagsResponse": { + "DescribeTagsResult": { + "TaggedResources": tagged_resources + }, + "ResponseMetadata": { + "RequestId": "384ac68d-3775-11df-8963-01868b7c937a", + } + } + }) diff --git a/tests/test_redshift/test_redshift.py b/tests/test_redshift/test_redshift.py index aff3e8bed..1df503de2 100644 --- a/tests/test_redshift/test_redshift.py +++ b/tests/test_redshift/test_redshift.py @@ -9,6 +9,9 @@ from boto.redshift.exceptions import ( ClusterSubnetGroupNotFound, InvalidSubnet, ) +from botocore.exceptions import ( + ClientError +) import sure # noqa from moto import mock_ec2 @@ -36,7 +39,7 @@ def test_create_cluster(): conn = boto.redshift.connect_to_region("us-east-1") cluster_identifier = 'my_cluster' - conn.create_cluster( + cluster_response = conn.create_cluster( cluster_identifier, node_type="dw.hs1.xlarge", master_username="username", @@ -51,6 +54,8 @@ def test_create_cluster(): allow_version_upgrade=True, number_of_nodes=3, ) + cluster_response['CreateClusterResponse']['CreateClusterResult'][ + 'Cluster']['ClusterStatus'].should.equal('creating') cluster_response = conn.describe_clusters(cluster_identifier) cluster = cluster_response['DescribeClustersResponse'][ @@ -320,7 +325,6 @@ def test_modify_cluster(): cluster_identifier, cluster_type="multi-node", node_type="dw.hs1.xlarge", - number_of_nodes=2, cluster_security_groups="security_group", master_user_password="new_password", cluster_parameter_group_name="my_parameter_group", @@ -343,7 +347,8 @@ def test_modify_cluster(): 'ParameterGroupName'].should.equal("my_parameter_group") cluster['AutomatedSnapshotRetentionPeriod'].should.equal(7) cluster['AllowVersionUpgrade'].should.equal(False) - cluster['NumberOfNodes'].should.equal(2) + # This one should remain unmodified. + cluster['NumberOfNodes'].should.equal(1) @mock_redshift_deprecated @@ -523,3 +528,177 @@ def test_delete_cluster_parameter_group(): # Delete invalid id conn.delete_cluster_parameter_group.when.called_with( "not-a-parameter-group").should.throw(ClusterParameterGroupNotFound) + + +@mock_redshift +def test_create_cluster_snapshot(): + client = boto3.client('redshift', region_name='us-east-1') + cluster_identifier = 'my_cluster' + snapshot_identifier = 'my_snapshot' + + cluster_response = client.create_cluster( + DBName='test-db', + ClusterIdentifier=cluster_identifier, + ClusterType='single-node', + NodeType='ds2.xlarge', + MasterUsername='username', + MasterUserPassword='password', + ) + cluster_response['Cluster']['NodeType'].should.equal('ds2.xlarge') + + snapshot_response = client.create_cluster_snapshot( + SnapshotIdentifier=snapshot_identifier, + ClusterIdentifier=cluster_identifier, + Tags=[{'Key': 'test-tag-key', + 'Value': 'test-tag-value'}] + ) + snapshot = snapshot_response['Snapshot'] + snapshot['SnapshotIdentifier'].should.equal(snapshot_identifier) + snapshot['ClusterIdentifier'].should.equal(cluster_identifier) + snapshot['NumberOfNodes'].should.equal(1) + snapshot['NodeType'].should.equal('ds2.xlarge') + snapshot['MasterUsername'].should.equal('username') + + +@mock_redshift +def test_delete_cluster_snapshot(): + client = boto3.client('redshift', region_name='us-east-1') + cluster_identifier = 'my_cluster' + snapshot_identifier = 'my_snapshot' + + client.create_cluster( + ClusterIdentifier=cluster_identifier, + ClusterType='single-node', + NodeType='ds2.xlarge', + MasterUsername='username', + MasterUserPassword='password', + ) + client.create_cluster_snapshot( + SnapshotIdentifier=snapshot_identifier, + ClusterIdentifier=cluster_identifier + ) + + snapshots = client.describe_cluster_snapshots()['Snapshots'] + list(snapshots).should.have.length_of(1) + + client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)[ + 'Snapshot']['Status'].should.equal('deleted') + + snapshots = client.describe_cluster_snapshots()['Snapshots'] + list(snapshots).should.have.length_of(0) + + # Delete invalid id + client.delete_cluster_snapshot.when.called_with( + SnapshotIdentifier="not-a-snapshot").should.throw(ClientError) + + +@mock_redshift +def test_cluster_snapshot_already_exists(): + client = boto3.client('redshift', region_name='us-east-1') + cluster_identifier = 'my_cluster' + snapshot_identifier = 'my_snapshot' + + client.create_cluster( + DBName='test-db', + ClusterIdentifier=cluster_identifier, + ClusterType='single-node', + NodeType='ds2.xlarge', + MasterUsername='username', + MasterUserPassword='password', + ) + + client.create_cluster_snapshot( + SnapshotIdentifier=snapshot_identifier, + ClusterIdentifier=cluster_identifier + ) + + client.create_cluster_snapshot.when.called_with( + SnapshotIdentifier=snapshot_identifier, + ClusterIdentifier=cluster_identifier + ).should.throw(ClientError) + + +@mock_redshift +def test_create_cluster_from_snapshot(): + client = boto3.client('redshift', region_name='us-east-1') + original_cluster_identifier = 'original-cluster' + original_snapshot_identifier = 'original-snapshot' + new_cluster_identifier = 'new-cluster' + + client.create_cluster( + ClusterIdentifier=original_cluster_identifier, + ClusterType='single-node', + NodeType='ds2.xlarge', + MasterUsername='username', + MasterUserPassword='password', + ) + client.create_cluster_snapshot( + SnapshotIdentifier=original_snapshot_identifier, + ClusterIdentifier=original_cluster_identifier + ) + response = client.restore_from_cluster_snapshot( + ClusterIdentifier=new_cluster_identifier, + SnapshotIdentifier=original_snapshot_identifier, + Port=1234 + ) + response['Cluster']['ClusterStatus'].should.equal('creating') + + response = client.describe_clusters( + ClusterIdentifier=new_cluster_identifier + ) + new_cluster = response['Clusters'][0] + new_cluster['NodeType'].should.equal('ds2.xlarge') + new_cluster['MasterUsername'].should.equal('username') + new_cluster['Endpoint']['Port'].should.equal(1234) + + +@mock_redshift +def test_create_cluster_status_update(): + client = boto3.client('redshift', region_name='us-east-1') + cluster_identifier = 'test-cluster' + + response = client.create_cluster( + ClusterIdentifier=cluster_identifier, + ClusterType='single-node', + NodeType='ds2.xlarge', + MasterUsername='username', + MasterUserPassword='password', + ) + response['Cluster']['ClusterStatus'].should.equal('creating') + + response = client.describe_clusters( + ClusterIdentifier=cluster_identifier + ) + response['Clusters'][0]['ClusterStatus'].should.equal('available') + + +@mock_redshift +def test_describe_snapshot_tags(): + client = boto3.client('redshift', region_name='us-east-1') + cluster_identifier = 'my_cluster' + snapshot_identifier = 'my_snapshot' + tag_key = 'test-tag-key' + tag_value = 'teat-tag-value' + + client.create_cluster( + DBName='test-db', + ClusterIdentifier=cluster_identifier, + ClusterType='single-node', + NodeType='ds2.xlarge', + MasterUsername='username', + MasterUserPassword='password', + ) + + client.create_cluster_snapshot( + SnapshotIdentifier=snapshot_identifier, + ClusterIdentifier=cluster_identifier, + Tags=[{'Key': tag_key, + 'Value': tag_value}] + ) + + tags_response = client.describe_tags(ResourceType='Snapshot') + tagged_resources = tags_response['TaggedResources'] + list(tagged_resources).should.have.length_of(1) + tag = tagged_resources[0]['Tag'] + tag['Key'].should.equal(tag_key) + tag['Value'].should.equal(tag_value)