Merge pull request #1473 from Logicworks/feature/ebs-copy-snapshot

Add support for copying EBS snapshots
This commit is contained in:
Steve Pulec 2018-04-18 22:51:54 -04:00 committed by GitHub
commit 8fa4c64c15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 3 deletions

View File

@ -1335,7 +1335,7 @@
- [ ] confirm_product_instance
- [ ] copy_fpga_image
- [X] copy_image
- [ ] copy_snapshot
- [X] copy_snapshot
- [X] create_customer_gateway
- [ ] create_default_subnet
- [ ] create_default_vpc

View File

@ -1971,6 +1971,15 @@ class EBSBackend(object):
matches = generic_filter(filters, matches)
return matches
def copy_snapshot(self, source_snapshot_id, source_region, description=None):
source_snapshot = ec2_backends[source_region].describe_snapshots(
snapshot_ids=[source_snapshot_id])[0]
snapshot_id = random_snapshot_id()
snapshot = Snapshot(self, snapshot_id, volume=source_snapshot.volume,
description=description, encrypted=source_snapshot.encrypted)
self.snapshots[snapshot_id] = snapshot
return snapshot
def get_snapshot(self, snapshot_id):
snapshot = self.snapshots.get(snapshot_id, None)
if not snapshot:

View File

@ -16,9 +16,14 @@ class ElasticBlockStore(BaseResponse):
return template.render(attachment=attachment)
def copy_snapshot(self):
source_snapshot_id = self._get_param('SourceSnapshotId')
source_region = self._get_param('SourceRegion')
description = self._get_param('Description')
if self.is_not_dryrun('CopySnapshot'):
raise NotImplementedError(
'ElasticBlockStore.copy_snapshot is not yet implemented')
snapshot = self.ec2_backend.copy_snapshot(
source_snapshot_id, source_region, description)
template = self.response_template(COPY_SNAPSHOT_RESPONSE)
return template.render(snapshot=snapshot)
def create_snapshot(self):
volume_id = self._get_param('VolumeId')
@ -248,6 +253,11 @@ CREATE_SNAPSHOT_RESPONSE = """<CreateSnapshotResponse xmlns="http://ec2.amazonaw
</tagSet>
</CreateSnapshotResponse>"""
COPY_SNAPSHOT_RESPONSE = """<CopySnapshotResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<snapshotId>{{ snapshot.id }}</snapshotId>
</CopySnapshotResponse>"""
DESCRIBE_SNAPSHOTS_RESPONSE = """<DescribeSnapshotsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<snapshotSet>

View File

@ -6,6 +6,7 @@ from nose.tools import assert_raises
from moto.ec2 import ec2_backends
import boto
import boto3
from botocore.exceptions import ClientError
from boto.exception import EC2ResponseError
import sure # noqa
@ -587,6 +588,59 @@ def test_volume_tag_escaping():
dict(snaps[0].tags).should.equal({'key': '</closed>'})
@mock_ec2
def test_copy_snapshot():
ec2_client = boto3.client('ec2', region_name='eu-west-1')
dest_ec2_client = boto3.client('ec2', region_name='eu-west-2')
volume_response = ec2_client.create_volume(
AvailabilityZone='eu-west-1a', Size=10
)
create_snapshot_response = ec2_client.create_snapshot(
VolumeId=volume_response['VolumeId']
)
copy_snapshot_response = dest_ec2_client.copy_snapshot(
SourceSnapshotId=create_snapshot_response['SnapshotId'],
SourceRegion="eu-west-1"
)
ec2 = boto3.resource('ec2', region_name='eu-west-1')
dest_ec2 = boto3.resource('ec2', region_name='eu-west-2')
source = ec2.Snapshot(create_snapshot_response['SnapshotId'])
dest = dest_ec2.Snapshot(copy_snapshot_response['SnapshotId'])
attribs = ['data_encryption_key_id', 'encrypted',
'kms_key_id', 'owner_alias', 'owner_id', 'progress',
'start_time', 'state', 'state_message',
'tags', 'volume_id', 'volume_size']
for attrib in attribs:
getattr(source, attrib).should.equal(getattr(dest, attrib))
# Copy from non-existent source ID.
with assert_raises(ClientError) as cm:
create_snapshot_error = ec2_client.create_snapshot(
VolumeId='vol-abcd1234'
)
cm.exception.response['Error']['Code'].should.equal('InvalidVolume.NotFound')
cm.exception.response['Error']['Message'].should.equal("The volume 'vol-abcd1234' does not exist.")
cm.exception.response['ResponseMetadata']['RequestId'].should_not.be.none
cm.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
# Copy from non-existent source region.
with assert_raises(ClientError) as cm:
copy_snapshot_response = dest_ec2_client.copy_snapshot(
SourceSnapshotId=create_snapshot_response['SnapshotId'],
SourceRegion="eu-west-2"
)
cm.exception.response['Error']['Code'].should.equal('InvalidSnapshot.NotFound')
cm.exception.response['Error']['Message'].should.be.none
cm.exception.response['ResponseMetadata']['RequestId'].should_not.be.none
cm.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
@mock_ec2
def test_search_for_many_snapshots():
ec2_client = boto3.client('ec2', region_name='eu-west-1')