Implement UserIds for Snapshot Attributes (#3192)

* implement register_image

* format code

* add user_ids to snapshot model

* implement register_image

* format code

* add user_ids to snapshot model

* trying to un-deprecate tests

* Write tests and finalize implementation

* Add region parameter to boto3 resource call

* fixed test error
This commit is contained in:
Larry Aiello 2020-08-01 12:03:54 -04:00 committed by GitHub
parent 9a9a1d8413
commit 06ed67a8e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 221 additions and 48 deletions

View File

@ -2488,6 +2488,7 @@ class Snapshot(TaggedEC2Resource):
self.description = description
self.start_time = utc_date_and_time()
self.create_volume_permission_groups = set()
self.create_volume_permission_userids = set()
self.ec2_backend = ec2_backend
self.status = "completed"
self.encrypted = encrypted
@ -2652,28 +2653,32 @@ class EBSBackend(object):
snapshot = self.get_snapshot(snapshot_id)
return snapshot.create_volume_permission_groups
def add_create_volume_permission(self, snapshot_id, user_id=None, group=None):
if user_id:
self.raise_not_implemented_error(
"The UserId parameter for ModifySnapshotAttribute"
)
if group != "all":
raise InvalidAMIAttributeItemValueError("UserGroup", group)
def get_create_volume_permission_userids(self, snapshot_id):
snapshot = self.get_snapshot(snapshot_id)
snapshot.create_volume_permission_groups.add(group)
return snapshot.create_volume_permission_userids
def add_create_volume_permission(self, snapshot_id, user_ids=None, groups=None):
snapshot = self.get_snapshot(snapshot_id)
if user_ids:
snapshot.create_volume_permission_userids.update(user_ids)
if groups and groups != ["all"]:
raise InvalidAMIAttributeItemValueError("UserGroup", groups)
else:
snapshot.create_volume_permission_groups.update(groups)
return True
def remove_create_volume_permission(self, snapshot_id, user_id=None, group=None):
if user_id:
self.raise_not_implemented_error(
"The UserId parameter for ModifySnapshotAttribute"
)
if group != "all":
raise InvalidAMIAttributeItemValueError("UserGroup", group)
def remove_create_volume_permission(self, snapshot_id, user_ids=None, groups=None):
snapshot = self.get_snapshot(snapshot_id)
snapshot.create_volume_permission_groups.discard(group)
if user_ids:
snapshot.create_volume_permission_userids.difference_update(user_ids)
if groups and groups != ["all"]:
raise InvalidAMIAttributeItemValueError("UserGroup", groups)
else:
snapshot.create_volume_permission_groups.difference_update(groups)
return True

View File

@ -116,22 +116,23 @@ class ElasticBlockStore(BaseResponse):
def describe_snapshot_attribute(self):
snapshot_id = self._get_param("SnapshotId")
groups = self.ec2_backend.get_create_volume_permission_groups(snapshot_id)
user_ids = self.ec2_backend.get_create_volume_permission_userids(snapshot_id)
template = self.response_template(DESCRIBE_SNAPSHOT_ATTRIBUTES_RESPONSE)
return template.render(snapshot_id=snapshot_id, groups=groups)
return template.render(snapshot_id=snapshot_id, groups=groups, userIds=user_ids)
def modify_snapshot_attribute(self):
snapshot_id = self._get_param("SnapshotId")
operation_type = self._get_param("OperationType")
group = self._get_param("UserGroup.1")
user_id = self._get_param("UserId.1")
groups = self._get_multi_param("UserGroup")
user_ids = self._get_multi_param("UserId")
if self.is_not_dryrun("ModifySnapshotAttribute"):
if operation_type == "add":
self.ec2_backend.add_create_volume_permission(
snapshot_id, user_id=user_id, group=group
snapshot_id, user_ids=user_ids, groups=groups
)
elif operation_type == "remove":
self.ec2_backend.remove_create_volume_permission(
snapshot_id, user_id=user_id, group=group
snapshot_id, user_ids=user_ids, groups=groups
)
return MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE
@ -311,18 +312,18 @@ DESCRIBE_SNAPSHOT_ATTRIBUTES_RESPONSE = """
<DescribeSnapshotAttributeResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>a9540c9f-161a-45d8-9cc1-1182b89ad69f</requestId>
<snapshotId>snap-a0332ee0</snapshotId>
{% if not groups %}
<createVolumePermission/>
{% endif %}
{% if groups %}
<createVolumePermission>
{% for group in groups %}
<item>
<group>{{ group }}</group>
</item>
{% endfor %}
</createVolumePermission>
{% endif %}
<createVolumePermission>
{% for group in groups %}
<item>
<group>{{ group }}</group>
</item>
{% endfor %}
{% for userId in userIds %}
<item>
<userId>{{ userId }}</userId>
</item>
{% endfor %}
</createVolumePermission>
</DescribeSnapshotAttributeResponse>
"""

View File

@ -783,6 +783,16 @@ def test_ami_registration():
assert images[0]["State"] == "available", "State should be available."
@mock_ec2
def test_ami_registration():
ec2 = boto3.client("ec2", region_name="us-east-1")
image_id = ec2.register_image(Name="test-register-image").get("ImageId", "")
images = ec2.describe_images(ImageIds=[image_id]).get("Images", [])
assert images[0]["Name"] == "test-register-image", "No image was registered."
assert images[0]["RootDeviceName"] == "/dev/sda1", "Wrong root device name."
assert images[0]["State"] == "available", "State should be available."
@mock_ec2
def test_ami_filter_wildcard():
ec2_resource = boto3.resource("ec2", region_name="us-west-1")

View File

@ -562,19 +562,176 @@ def test_snapshot_attribute():
cm.exception.status.should.equal(400)
cm.exception.request_id.should_not.be.none
# Error: Add or remove with user ID instead of group
conn.modify_snapshot_attribute.when.called_with(
snapshot.id,
attribute="createVolumePermission",
operation="add",
user_ids=["user"],
).should.throw(NotImplementedError)
conn.modify_snapshot_attribute.when.called_with(
snapshot.id,
attribute="createVolumePermission",
operation="remove",
user_ids=["user"],
).should.throw(NotImplementedError)
@mock_ec2
def test_modify_snapshot_attribute():
import copy
ec2_client = boto3.client("ec2", region_name="us-east-1")
response = ec2_client.create_volume(Size=80, AvailabilityZone="us-east-1a")
volume = boto3.resource("ec2", region_name="us-east-1").Volume(response["VolumeId"])
snapshot = volume.create_snapshot()
# Baseline
attributes = ec2_client.describe_snapshot_attribute(
SnapshotId=snapshot.id, Attribute="createVolumePermission"
)
assert not attributes[
"CreateVolumePermissions"
], "Snapshot should have no permissions."
ADD_GROUP_ARGS = {
"SnapshotId": snapshot.id,
"Attribute": "createVolumePermission",
"OperationType": "add",
"GroupNames": ["all"],
}
REMOVE_GROUP_ARGS = {
"SnapshotId": snapshot.id,
"Attribute": "createVolumePermission",
"OperationType": "remove",
"GroupNames": ["all"],
}
# Add 'all' group and confirm
with assert_raises(ClientError) as cm:
ec2_client.modify_snapshot_attribute(**dict(ADD_GROUP_ARGS, **{"DryRun": True}))
cm.exception.response["Error"]["Code"].should.equal("DryRunOperation")
cm.exception.response["ResponseMetadata"]["RequestId"].should_not.be.none
cm.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ec2_client.modify_snapshot_attribute(**ADD_GROUP_ARGS)
attributes = ec2_client.describe_snapshot_attribute(
SnapshotId=snapshot.id, Attribute="createVolumePermission"
)
assert attributes["CreateVolumePermissions"] == [
{"Group": "all"}
], "This snapshot should have public group permissions."
# Add is idempotent
ec2_client.modify_snapshot_attribute.when.called_with(
**ADD_GROUP_ARGS
).should_not.throw(ClientError)
assert attributes["CreateVolumePermissions"] == [
{"Group": "all"}
], "This snapshot should have public group permissions."
# Remove 'all' group and confirm
with assert_raises(ClientError) as ex:
ec2_client.modify_snapshot_attribute(
**dict(REMOVE_GROUP_ARGS, **{"DryRun": True})
)
cm.exception.response["Error"]["Code"].should.equal("DryRunOperation")
cm.exception.response["ResponseMetadata"]["RequestId"].should_not.be.none
cm.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ec2_client.modify_snapshot_attribute(**REMOVE_GROUP_ARGS)
attributes = ec2_client.describe_snapshot_attribute(
SnapshotId=snapshot.id, Attribute="createVolumePermission"
)
assert not attributes[
"CreateVolumePermissions"
], "This snapshot should have no permissions."
# Remove is idempotent
ec2_client.modify_snapshot_attribute.when.called_with(
**REMOVE_GROUP_ARGS
).should_not.throw(ClientError)
assert not attributes[
"CreateVolumePermissions"
], "This snapshot should have no permissions."
# Error: Add with group != 'all'
with assert_raises(ClientError) as cm:
ec2_client.modify_snapshot_attribute(
SnapshotId=snapshot.id,
Attribute="createVolumePermission",
OperationType="add",
GroupNames=["everyone"],
)
cm.exception.response["Error"]["Code"].should.equal("InvalidAMIAttributeItemValue")
cm.exception.response["ResponseMetadata"]["RequestId"].should_not.be.none
cm.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
# Error: Add with invalid snapshot ID
with assert_raises(ClientError) as cm:
ec2_client.modify_snapshot_attribute(
SnapshotId="snapshot-abcd1234",
Attribute="createVolumePermission",
OperationType="add",
GroupNames=["all"],
)
cm.exception.response["Error"]["Code"].should.equal("InvalidSnapshot.NotFound")
cm.exception.response["ResponseMetadata"]["RequestId"].should_not.be.none
cm.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
# Error: Remove with invalid snapshot ID
with assert_raises(ClientError) as cm:
ec2_client.modify_snapshot_attribute(
SnapshotId="snapshot-abcd1234",
Attribute="createVolumePermission",
OperationType="remove",
GroupNames=["all"],
)
cm.exception.response["Error"]["Code"].should.equal("InvalidSnapshot.NotFound")
cm.exception.response["ResponseMetadata"]["RequestId"].should_not.be.none
cm.exception.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
# Test adding user id
ec2_client.modify_snapshot_attribute(
SnapshotId=snapshot.id,
Attribute="createVolumePermission",
OperationType="add",
UserIds=["1234567891"],
)
attributes = ec2_client.describe_snapshot_attribute(
SnapshotId=snapshot.id, Attribute="createVolumePermission"
)
assert len(attributes["CreateVolumePermissions"]) == 1
# Test adding user id again along with additional.
ec2_client.modify_snapshot_attribute(
SnapshotId=snapshot.id,
Attribute="createVolumePermission",
OperationType="add",
UserIds=["1234567891", "2345678912"],
)
attributes = ec2_client.describe_snapshot_attribute(
SnapshotId=snapshot.id, Attribute="createVolumePermission"
)
assert len(attributes["CreateVolumePermissions"]) == 2
# Test removing both user IDs.
ec2_client.modify_snapshot_attribute(
SnapshotId=snapshot.id,
Attribute="createVolumePermission",
OperationType="remove",
UserIds=["1234567891", "2345678912"],
)
attributes = ec2_client.describe_snapshot_attribute(
SnapshotId=snapshot.id, Attribute="createVolumePermission"
)
assert len(attributes["CreateVolumePermissions"]) == 0
# Idempotency when removing users.
ec2_client.modify_snapshot_attribute(
SnapshotId=snapshot.id,
Attribute="createVolumePermission",
OperationType="remove",
UserIds=["1234567891"],
)
attributes = ec2_client.describe_snapshot_attribute(
SnapshotId=snapshot.id, Attribute="createVolumePermission"
)
assert len(attributes["CreateVolumePermissions"]) == 0
@mock_ec2_deprecated