Redshift Updates

- Implement create_cluster_snapshot endpoint
- Implement describe_cluster_snapshots endpoint
- Implement delete_cluster_snapshot endpoint
- Implement restore_from_cluster_snapshot endpoint
- Implement limited support for describe_tags endpoint
- Correctly serialize errors to json (for boto) or xml (for boto3)
- Simulate cluster spin up by returning initial status as 'creating' and subsequent statuses as 'available'
- Fix issue with modify_cluster endpoint where cluster values get set to None when omitted from request
- Add 'Endpoint' key to describe_clusters response syntax
This commit is contained in:
Brian Pandola 2017-08-09 18:43:21 -07:00
parent efc0a71d96
commit 0a03a7237e
4 changed files with 474 additions and 8 deletions

View File

@ -56,3 +56,18 @@ class InvalidSubnetError(RedshiftClientError):
super(InvalidSubnetError, self).__init__(
'InvalidSubnet',
"Subnet {0} not found.".format(subnet_identifier))
class ClusterSnapshotNotFoundError(RedshiftClientError):
def __init__(self, snapshot_identifier):
super(ClusterSnapshotNotFoundError, self).__init__(
'ClusterSnapshotNotFound',
"Snapshot {0} not found.".format(snapshot_identifier))
class ClusterSnapshotAlreadyExistsError(RedshiftClientError):
def __init__(self, snapshot_identifier):
super(ClusterSnapshotAlreadyExistsError, self).__init__(
'ClusterSnapshotAlreadyExists',
"Cannot create the snapshot because a snapshot with the "
"identifier {0} already exists".format(snapshot_identifier))

View File

@ -1,12 +1,19 @@
from __future__ import unicode_literals
import copy
import datetime
import boto.redshift
from moto.compat import OrderedDict
from moto.core import BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds
from moto.ec2 import ec2_backends
from .exceptions import (
ClusterNotFoundError,
ClusterParameterGroupNotFoundError,
ClusterSecurityGroupNotFoundError,
ClusterSnapshotAlreadyExistsError,
ClusterSnapshotNotFoundError,
ClusterSubnetGroupNotFoundError,
InvalidSubnetError,
)
@ -23,6 +30,7 @@ class Cluster(BaseModel):
encrypted, region):
self.redshift_backend = redshift_backend
self.cluster_identifier = cluster_identifier
self.status = 'available'
self.node_type = node_type
self.master_username = master_username
self.master_user_password = master_user_password
@ -152,7 +160,7 @@ class Cluster(BaseModel):
} for group in self.vpc_security_groups],
"ClusterSubnetGroupName": self.cluster_subnet_group_name,
"AvailabilityZone": self.availability_zone,
"ClusterStatus": "creating",
"ClusterStatus": self.status,
"NumberOfNodes": self.number_of_nodes,
"AutomatedSnapshotRetentionPeriod": self.automated_snapshot_retention_period,
"PubliclyAccessible": self.publicly_accessible,
@ -171,6 +179,13 @@ class Cluster(BaseModel):
"NodeType": self.node_type,
"ClusterIdentifier": self.cluster_identifier,
"AllowVersionUpgrade": self.allow_version_upgrade,
"Endpoint": {
"Address": '{}.{}.redshift.amazonaws.com'.format(
self.cluster_identifier,
self.region),
"Port": self.port
},
"PendingModifiedValues": []
}
@ -262,6 +277,42 @@ class ParameterGroup(BaseModel):
}
class Snapshot(BaseModel):
def __init__(self, cluster, snapshot_identifier, tags=None):
self.cluster = copy.copy(cluster)
self.snapshot_identifier = snapshot_identifier
self.snapshot_type = 'manual'
self.status = 'available'
self.tags = tags or []
self.create_time = iso_8601_datetime_with_milliseconds(
datetime.datetime.now())
@property
def arn(self):
return "arn:aws:redshift:{0}:1234567890:snapshot:{1}/{2}".format(
self.cluster.region,
self.cluster.cluster_identifier,
self.snapshot_identifier)
def to_json(self):
return {
'SnapshotIdentifier': self.snapshot_identifier,
'ClusterIdentifier': self.cluster.cluster_identifier,
'SnapshotCreateTime': self.create_time,
'Status': self.status,
'Port': self.cluster.port,
'AvailabilityZone': self.cluster.availability_zone,
'MasterUsername': self.cluster.master_username,
'ClusterVersion': self.cluster.cluster_version,
'SnapshotType': self.snapshot_type,
'NodeType': self.cluster.node_type,
'NumberOfNodes': self.cluster.number_of_nodes,
'DBName': self.cluster.db_name,
'Tags': self.tags
}
class RedshiftBackend(BaseBackend):
def __init__(self, ec2_backend):
@ -278,6 +329,7 @@ class RedshiftBackend(BaseBackend):
)
}
self.ec2_backend = ec2_backend
self.snapshots = OrderedDict()
def reset(self):
ec2_backend = self.ec2_backend
@ -383,6 +435,54 @@ class RedshiftBackend(BaseBackend):
return self.parameter_groups.pop(parameter_group_name)
raise ClusterParameterGroupNotFoundError(parameter_group_name)
def create_snapshot(self, cluster_identifier, snapshot_identifier, tags):
cluster = self.clusters.get(cluster_identifier)
if not cluster:
raise ClusterNotFoundError(cluster_identifier)
if self.snapshots.get(snapshot_identifier) is not None:
raise ClusterSnapshotAlreadyExistsError(snapshot_identifier)
snapshot = Snapshot(cluster, snapshot_identifier, tags)
self.snapshots[snapshot_identifier] = snapshot
return snapshot
def describe_snapshots(self, cluster_identifier, snapshot_identifier):
if cluster_identifier:
for snapshot in self.snapshots.values():
if snapshot.cluster.cluster_identifier == cluster_identifier:
return [snapshot]
raise ClusterNotFoundError(cluster_identifier)
if snapshot_identifier:
if snapshot_identifier in self.snapshots:
return [self.snapshots[snapshot_identifier]]
raise ClusterSnapshotNotFoundError(snapshot_identifier)
return self.snapshots.values()
def delete_snapshot(self, snapshot_identifier):
if snapshot_identifier not in self.snapshots:
raise ClusterSnapshotNotFoundError(snapshot_identifier)
deleted_snapshot = self.snapshots.pop(snapshot_identifier)
deleted_snapshot.status = 'deleted'
return deleted_snapshot
def describe_tags_for_resource_type(self, resource_type):
tagged_resources = []
if resource_type == 'Snapshot':
for snapshot in self.snapshots.values():
for tag in snapshot.tags:
data = {
'ResourceName': snapshot.arn,
'ResourceType': 'snapshot',
'Tag': {
'Key': tag['Key'],
'Value': tag['Value']
}
}
tagged_resources.append(data)
return tagged_resources
redshift_backends = {}
for region in boto.redshift.regions():

View File

@ -1,12 +1,31 @@
from __future__ import unicode_literals
import json
import dicttoxml
from jinja2 import Template
from six import iteritems
from moto.core.responses import BaseResponse
from .models import redshift_backends
def convert_json_error_to_xml(json_error):
error = json.loads(json_error)
code = error['Error']['Code']
message = error['Error']['Message']
template = Template("""
<RedshiftClientError>
<Error>
<Code>{{ code }}</Code>
<Message>{{ message }}</Message>
<Type>Sender</Type>
</Error>
<RequestId>6876f774-7273-11e4-85dc-39e55ca848d1</RequestId>
</RedshiftClientError>""")
return template.render(code=code, message=message)
class RedshiftResponse(BaseResponse):
@property
@ -20,6 +39,24 @@ class RedshiftResponse(BaseResponse):
xml = dicttoxml.dicttoxml(response, attr_type=False, root=False)
return xml.decode("utf-8")
def call_action(self):
status, headers, body = super(RedshiftResponse, self).call_action()
if status >= 400 and not self.request_json:
body = convert_json_error_to_xml(body)
return status, headers, body
def unpack_complex_list_params(self, label, names):
unpacked_list = list()
count = 1
while self._get_param('{0}.{1}.{2}'.format(label, count, names[0])):
param = dict()
for i in range(len(names)):
param[names[i]] = self._get_param(
'{0}.{1}.{2}'.format(label, count, names[i]))
unpacked_list.append(param)
count += 1
return unpacked_list
def create_cluster(self):
cluster_kwargs = {
"cluster_identifier": self._get_param('ClusterIdentifier'),
@ -43,12 +80,66 @@ class RedshiftResponse(BaseResponse):
"encrypted": self._get_param("Encrypted"),
"region": self.region,
}
cluster = self.redshift_backend.create_cluster(**cluster_kwargs)
cluster = self.redshift_backend.create_cluster(**cluster_kwargs).to_json()
cluster['ClusterStatus'] = 'creating'
return self.get_response({
"CreateClusterResponse": {
"CreateClusterResult": {
"Cluster": cluster.to_json(),
"Cluster": cluster,
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})
def restore_from_cluster_snapshot(self):
snapshot_identifier = self._get_param('SnapshotIdentifier')
snapshots = self.redshift_backend.describe_snapshots(
None,
snapshot_identifier)
snapshot = snapshots[0]
kwargs_from_snapshot = {
"node_type": snapshot.cluster.node_type,
"master_username": snapshot.cluster.master_username,
"master_user_password": snapshot.cluster.master_user_password,
"db_name": snapshot.cluster.db_name,
"cluster_type": 'multi-node' if snapshot.cluster.number_of_nodes > 1 else 'single-node',
"availability_zone": snapshot.cluster.availability_zone,
"port": snapshot.cluster.port,
"cluster_version": snapshot.cluster.cluster_version,
"number_of_nodes": snapshot.cluster.number_of_nodes,
}
kwargs_from_request = {
"cluster_identifier": self._get_param('ClusterIdentifier'),
"port": self._get_int_param('Port'),
"availability_zone": self._get_param('AvailabilityZone'),
"allow_version_upgrade": self._get_bool_param(
'AllowVersionUpgrade'),
"cluster_subnet_group_name": self._get_param(
'ClusterSubnetGroupName'),
"publicly_accessible": self._get_param("PubliclyAccessible"),
"cluster_parameter_group_name": self._get_param(
'ClusterParameterGroupName'),
"cluster_security_groups": self._get_multi_param(
'ClusterSecurityGroups.member'),
"vpc_security_group_ids": self._get_multi_param(
'VpcSecurityGroupIds.member'),
"preferred_maintenance_window": self._get_param(
'PreferredMaintenanceWindow'),
"automated_snapshot_retention_period": self._get_int_param(
'AutomatedSnapshotRetentionPeriod'),
"region": self.region,
"encrypted": False,
}
kwargs_from_snapshot.update(kwargs_from_request)
cluster_kwargs = kwargs_from_snapshot
cluster = self.redshift_backend.create_cluster(**cluster_kwargs).to_json()
cluster['ClusterStatus'] = 'creating'
return self.get_response({
"RestoreFromClusterSnapshotResponse": {
"RestoreFromClusterSnapshotResult": {
"Cluster": cluster,
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
@ -72,7 +163,7 @@ class RedshiftResponse(BaseResponse):
})
def modify_cluster(self):
cluster_kwargs = {
request_kwargs = {
"cluster_identifier": self._get_param('ClusterIdentifier'),
"new_cluster_identifier": self._get_param('NewClusterIdentifier'),
"node_type": self._get_param('NodeType'),
@ -90,6 +181,19 @@ class RedshiftResponse(BaseResponse):
"publicly_accessible": self._get_param("PubliclyAccessible"),
"encrypted": self._get_param("Encrypted"),
}
# There's a bug in boto3 where the security group ids are not passed
# according to the AWS documentation
if not request_kwargs['vpc_security_group_ids']:
request_kwargs['vpc_security_group_ids'] = self._get_multi_param(
'VpcSecurityGroupIds.VpcSecurityGroupId')
cluster_kwargs = {}
# We only want parameters that were actually passed in, otherwise
# we'll stomp all over our cluster metadata with None values.
for (key, value) in iteritems(request_kwargs):
if value is not None and value != []:
cluster_kwargs[key] = value
cluster = self.redshift_backend.modify_cluster(**cluster_kwargs)
return self.get_response({
@ -273,3 +377,71 @@ class RedshiftResponse(BaseResponse):
}
}
})
def create_cluster_snapshot(self):
cluster_identifier = self._get_param('ClusterIdentifier')
snapshot_identifier = self._get_param('SnapshotIdentifier')
tags = self.unpack_complex_list_params(
'Tags.Tag', ('Key', 'Value'))
snapshot = self.redshift_backend.create_snapshot(cluster_identifier,
snapshot_identifier,
tags)
return self.get_response({
'CreateClusterSnapshotResponse': {
"CreateClusterSnapshotResult": {
"Snapshot": snapshot.to_json(),
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})
def describe_cluster_snapshots(self):
cluster_identifier = self._get_param('ClusterIdentifier')
snapshot_identifier = self._get_param('DBSnapshotIdentifier')
snapshots = self.redshift_backend.describe_snapshots(cluster_identifier,
snapshot_identifier)
return self.get_response({
"DescribeClusterSnapshotsResponse": {
"DescribeClusterSnapshotsResult": {
"Snapshots": [snapshot.to_json() for snapshot in snapshots]
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})
def delete_cluster_snapshot(self):
snapshot_identifier = self._get_param('SnapshotIdentifier')
snapshot = self.redshift_backend.delete_snapshot(snapshot_identifier)
return self.get_response({
"DeleteClusterSnapshotResponse": {
"DeleteClusterSnapshotResult": {
"Snapshot": snapshot.to_json()
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})
def describe_tags(self):
resource_type = self._get_param('ResourceType')
if resource_type != 'Snapshot':
raise NotImplementedError(
"The describe_tags action has not been fully implemented.")
tagged_resources = \
self.redshift_backend.describe_tags_for_resource_type(resource_type)
return self.get_response({
"DescribeTagsResponse": {
"DescribeTagsResult": {
"TaggedResources": tagged_resources
},
"ResponseMetadata": {
"RequestId": "384ac68d-3775-11df-8963-01868b7c937a",
}
}
})

View File

@ -9,6 +9,9 @@ from boto.redshift.exceptions import (
ClusterSubnetGroupNotFound,
InvalidSubnet,
)
from botocore.exceptions import (
ClientError
)
import sure # noqa
from moto import mock_ec2
@ -36,7 +39,7 @@ def test_create_cluster():
conn = boto.redshift.connect_to_region("us-east-1")
cluster_identifier = 'my_cluster'
conn.create_cluster(
cluster_response = conn.create_cluster(
cluster_identifier,
node_type="dw.hs1.xlarge",
master_username="username",
@ -51,6 +54,8 @@ def test_create_cluster():
allow_version_upgrade=True,
number_of_nodes=3,
)
cluster_response['CreateClusterResponse']['CreateClusterResult'][
'Cluster']['ClusterStatus'].should.equal('creating')
cluster_response = conn.describe_clusters(cluster_identifier)
cluster = cluster_response['DescribeClustersResponse'][
@ -320,7 +325,6 @@ def test_modify_cluster():
cluster_identifier,
cluster_type="multi-node",
node_type="dw.hs1.xlarge",
number_of_nodes=2,
cluster_security_groups="security_group",
master_user_password="new_password",
cluster_parameter_group_name="my_parameter_group",
@ -343,7 +347,8 @@ def test_modify_cluster():
'ParameterGroupName'].should.equal("my_parameter_group")
cluster['AutomatedSnapshotRetentionPeriod'].should.equal(7)
cluster['AllowVersionUpgrade'].should.equal(False)
cluster['NumberOfNodes'].should.equal(2)
# This one should remain unmodified.
cluster['NumberOfNodes'].should.equal(1)
@mock_redshift_deprecated
@ -523,3 +528,177 @@ def test_delete_cluster_parameter_group():
# Delete invalid id
conn.delete_cluster_parameter_group.when.called_with(
"not-a-parameter-group").should.throw(ClusterParameterGroupNotFound)
@mock_redshift
def test_create_cluster_snapshot():
client = boto3.client('redshift', region_name='us-east-1')
cluster_identifier = 'my_cluster'
snapshot_identifier = 'my_snapshot'
cluster_response = client.create_cluster(
DBName='test-db',
ClusterIdentifier=cluster_identifier,
ClusterType='single-node',
NodeType='ds2.xlarge',
MasterUsername='username',
MasterUserPassword='password',
)
cluster_response['Cluster']['NodeType'].should.equal('ds2.xlarge')
snapshot_response = client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier,
ClusterIdentifier=cluster_identifier,
Tags=[{'Key': 'test-tag-key',
'Value': 'test-tag-value'}]
)
snapshot = snapshot_response['Snapshot']
snapshot['SnapshotIdentifier'].should.equal(snapshot_identifier)
snapshot['ClusterIdentifier'].should.equal(cluster_identifier)
snapshot['NumberOfNodes'].should.equal(1)
snapshot['NodeType'].should.equal('ds2.xlarge')
snapshot['MasterUsername'].should.equal('username')
@mock_redshift
def test_delete_cluster_snapshot():
client = boto3.client('redshift', region_name='us-east-1')
cluster_identifier = 'my_cluster'
snapshot_identifier = 'my_snapshot'
client.create_cluster(
ClusterIdentifier=cluster_identifier,
ClusterType='single-node',
NodeType='ds2.xlarge',
MasterUsername='username',
MasterUserPassword='password',
)
client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier,
ClusterIdentifier=cluster_identifier
)
snapshots = client.describe_cluster_snapshots()['Snapshots']
list(snapshots).should.have.length_of(1)
client.delete_cluster_snapshot(SnapshotIdentifier=snapshot_identifier)[
'Snapshot']['Status'].should.equal('deleted')
snapshots = client.describe_cluster_snapshots()['Snapshots']
list(snapshots).should.have.length_of(0)
# Delete invalid id
client.delete_cluster_snapshot.when.called_with(
SnapshotIdentifier="not-a-snapshot").should.throw(ClientError)
@mock_redshift
def test_cluster_snapshot_already_exists():
client = boto3.client('redshift', region_name='us-east-1')
cluster_identifier = 'my_cluster'
snapshot_identifier = 'my_snapshot'
client.create_cluster(
DBName='test-db',
ClusterIdentifier=cluster_identifier,
ClusterType='single-node',
NodeType='ds2.xlarge',
MasterUsername='username',
MasterUserPassword='password',
)
client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier,
ClusterIdentifier=cluster_identifier
)
client.create_cluster_snapshot.when.called_with(
SnapshotIdentifier=snapshot_identifier,
ClusterIdentifier=cluster_identifier
).should.throw(ClientError)
@mock_redshift
def test_create_cluster_from_snapshot():
client = boto3.client('redshift', region_name='us-east-1')
original_cluster_identifier = 'original-cluster'
original_snapshot_identifier = 'original-snapshot'
new_cluster_identifier = 'new-cluster'
client.create_cluster(
ClusterIdentifier=original_cluster_identifier,
ClusterType='single-node',
NodeType='ds2.xlarge',
MasterUsername='username',
MasterUserPassword='password',
)
client.create_cluster_snapshot(
SnapshotIdentifier=original_snapshot_identifier,
ClusterIdentifier=original_cluster_identifier
)
response = client.restore_from_cluster_snapshot(
ClusterIdentifier=new_cluster_identifier,
SnapshotIdentifier=original_snapshot_identifier,
Port=1234
)
response['Cluster']['ClusterStatus'].should.equal('creating')
response = client.describe_clusters(
ClusterIdentifier=new_cluster_identifier
)
new_cluster = response['Clusters'][0]
new_cluster['NodeType'].should.equal('ds2.xlarge')
new_cluster['MasterUsername'].should.equal('username')
new_cluster['Endpoint']['Port'].should.equal(1234)
@mock_redshift
def test_create_cluster_status_update():
client = boto3.client('redshift', region_name='us-east-1')
cluster_identifier = 'test-cluster'
response = client.create_cluster(
ClusterIdentifier=cluster_identifier,
ClusterType='single-node',
NodeType='ds2.xlarge',
MasterUsername='username',
MasterUserPassword='password',
)
response['Cluster']['ClusterStatus'].should.equal('creating')
response = client.describe_clusters(
ClusterIdentifier=cluster_identifier
)
response['Clusters'][0]['ClusterStatus'].should.equal('available')
@mock_redshift
def test_describe_snapshot_tags():
client = boto3.client('redshift', region_name='us-east-1')
cluster_identifier = 'my_cluster'
snapshot_identifier = 'my_snapshot'
tag_key = 'test-tag-key'
tag_value = 'teat-tag-value'
client.create_cluster(
DBName='test-db',
ClusterIdentifier=cluster_identifier,
ClusterType='single-node',
NodeType='ds2.xlarge',
MasterUsername='username',
MasterUserPassword='password',
)
client.create_cluster_snapshot(
SnapshotIdentifier=snapshot_identifier,
ClusterIdentifier=cluster_identifier,
Tags=[{'Key': tag_key,
'Value': tag_value}]
)
tags_response = client.describe_tags(ResourceType='Snapshot')
tagged_resources = tags_response['TaggedResources']
list(tagged_resources).should.have.length_of(1)
tag = tagged_resources[0]['Tag']
tag['Key'].should.equal(tag_key)
tag['Value'].should.equal(tag_value)