EC2: modify_image_attributes() now supports all LaunchPermissions (#7324)

This commit is contained in:
Bert Blommers 2024-02-08 22:21:42 +00:00 committed by GitHub
parent 9a5f0a065e
commit 2c3f735e85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 129 additions and 84 deletions

View File

@ -2523,7 +2523,7 @@
- [X] modify_hosts - [X] modify_hosts
- [ ] modify_id_format - [ ] modify_id_format
- [ ] modify_identity_id_format - [ ] modify_identity_id_format
- [ ] modify_image_attribute - [X] modify_image_attribute
- [X] modify_instance_attribute - [X] modify_instance_attribute
- [ ] modify_instance_capacity_reservation_attributes - [ ] modify_instance_capacity_reservation_attributes
- [ ] modify_instance_credit_specification - [ ] modify_instance_credit_specification

View File

@ -545,7 +545,7 @@ ec2
- [X] modify_hosts - [X] modify_hosts
- [ ] modify_id_format - [ ] modify_id_format
- [ ] modify_identity_id_format - [ ] modify_identity_id_format
- [ ] modify_image_attribute - [X] modify_image_attribute
- [X] modify_instance_attribute - [X] modify_instance_attribute
- [ ] modify_instance_capacity_reservation_attributes - [ ] modify_instance_capacity_reservation_attributes
- [ ] modify_instance_credit_specification - [ ] modify_instance_credit_specification

View File

@ -101,11 +101,10 @@ class Ami(TaggedEC2Resource):
if not description: if not description:
self.description = source_ami.description self.description = source_ami.description
self.launch_permission_groups: Set[str] = set() self.launch_permissions: List[Dict[str, str]] = []
self.launch_permission_users: Set[str] = set()
if public: if public:
self.launch_permission_groups.add("all") self.launch_permissions.append({"Group": "all"})
# AWS auto-creates these, we should reflect the same. # AWS auto-creates these, we should reflect the same.
volume = self.ec2_backend.create_volume(size=15, zone_name=region_name) volume = self.ec2_backend.create_volume(size=15, zone_name=region_name)
@ -119,7 +118,7 @@ class Ami(TaggedEC2Resource):
@property @property
def is_public(self) -> bool: def is_public(self) -> bool:
return "all" in self.launch_permission_groups return {"Group": "all"} in self.launch_permissions
@property @property
def is_public_string(self) -> str: def is_public_string(self) -> str:
@ -273,8 +272,9 @@ class AmiBackend:
tmp_images = [] tmp_images = []
for ami in images: for ami in images:
for user_id in exec_users: for user_id in exec_users:
if user_id in ami.launch_permission_users: for lp in ami.launch_permissions:
tmp_images.append(ami) if lp.get("UserId") == user_id:
tmp_images.append(ami)
images = tmp_images images = tmp_images
# Limit by owner ids # Limit by owner ids
@ -305,38 +305,39 @@ class AmiBackend:
else: else:
raise InvalidAMIIdError(ami_id) raise InvalidAMIIdError(ami_id)
def validate_permission_targets( def validate_permission_targets(self, permissions: List[Dict[str, str]]) -> None:
self, user_ids: Optional[List[str]] = None, group: Optional[str] = None for perm in permissions:
) -> None: # If anything is invalid, nothing is added. (No partial success.)
# If anything is invalid, nothing is added. (No partial success.) # AWS docs:
if user_ids: # The AWS account ID is a 12-digit number, such as 123456789012, that you use to construct Amazon Resource Names (ARNs)."
""" # http://docs.aws.amazon.com/general/latest/gr/acct-identifiers.html
AWS docs:
"The AWS account ID is a 12-digit number, such as 123456789012, that you use to construct Amazon Resource Names (ARNs)."
http://docs.aws.amazon.com/general/latest/gr/acct-identifiers.html
"""
for user_id in user_ids:
if len(user_id) != 12 or not user_id.isdigit():
raise InvalidAMIAttributeItemValueError("userId", user_id)
if group and group != "all": if "UserId" in perm and (
raise InvalidAMIAttributeItemValueError("UserGroup", group) len(perm["UserId"]) != 12 or not perm["UserId"].isdigit()
):
raise InvalidAMIAttributeItemValueError("userId", perm["UserId"])
def add_launch_permission( if "Group" in perm and perm["Group"] != "all":
raise InvalidAMIAttributeItemValueError("UserGroup", perm["Group"])
def modify_image_attribute(
self, self,
ami_id: str, ami_id: str,
user_ids: Optional[List[str]] = None, launch_permissions_to_add: List[Dict[str, str]],
group: Optional[str] = None, launch_permissions_to_remove: List[Dict[str, str]],
) -> None: ) -> None:
ami = self.describe_images(ami_ids=[ami_id])[0] ami = self.describe_images(ami_ids=[ami_id])[0]
self.validate_permission_targets(user_ids=user_ids, group=group) self.validate_permission_targets(launch_permissions_to_add)
self.validate_permission_targets(launch_permissions_to_remove)
if user_ids: for lp in launch_permissions_to_add:
for user_id in user_ids: if lp not in ami.launch_permissions:
ami.launch_permission_users.add(user_id) ami.launch_permissions.append(lp)
for lp in launch_permissions_to_remove:
if group: try:
ami.launch_permission_groups.add(group) ami.launch_permissions.remove(lp)
except ValueError:
# The LaunchPermission may not exist
pass
def register_image( def register_image(
self, name: Optional[str] = None, description: Optional[str] = None self, name: Optional[str] = None, description: Optional[str] = None
@ -353,21 +354,5 @@ class AmiBackend:
self.amis[ami_id] = ami self.amis[ami_id] = ami
return ami return ami
def remove_launch_permission(
self,
ami_id: str,
user_ids: Optional[List[str]] = None,
group: Optional[str] = None,
) -> None:
ami = self.describe_images(ami_ids=[ami_id])[0]
self.validate_permission_targets(user_ids=user_ids, group=group)
if user_ids:
for user_id in user_ids:
ami.launch_permission_users.discard(user_id)
if group:
ami.launch_permission_groups.discard(group)
def describe_image_attribute(self, ami_id: str, attribute_name: str) -> Any: def describe_image_attribute(self, ami_id: str, attribute_name: str) -> Any:
return self.amis[ami_id].__getattribute__(attribute_name) return self.amis[ami_id].__getattribute__(attribute_name)

View File

@ -65,10 +65,7 @@ class AmisResponse(EC2BaseResponse):
"description": "description", "description": "description",
"kernel": "kernel_id", "kernel": "kernel_id",
"ramdisk": "ramdisk", "ramdisk": "ramdisk",
"launchPermission": { "launchPermission": "launch_permissions",
"groups": "launch_permission_groups",
"users": "launch_permission_users",
},
"productCodes": "product_codes", "productCodes": "product_codes",
"blockDeviceMapping": "bdm", "blockDeviceMapping": "bdm",
"sriovNetSupport": "sriov", "sriovNetSupport": "sriov",
@ -85,45 +82,63 @@ class AmisResponse(EC2BaseResponse):
# https://github.com/aws/aws-cli/issues/1083 # https://github.com/aws/aws-cli/issues/1083
raise AuthFailureRestricted raise AuthFailureRestricted
groups = None
users = None
attribute_value = None attribute_value = None
launch_permissions = None
if attribute_name == "launchPermission": if attribute_name == "launchPermission":
groups = self.ec2_backend.describe_image_attribute( launch_permissions = self.ec2_backend.describe_image_attribute(
ami_id, valid_atrributes_list[attribute_name]["groups"] # type: ignore[index] ami_id, valid_atrributes_list[attribute_name]
)
users = self.ec2_backend.describe_image_attribute(
ami_id, valid_atrributes_list[attribute_name]["users"] # type: ignore[index]
) )
else: else:
attribute_value = self.ec2_backend.describe_image_attribute( attribute_value = self.ec2_backend.describe_image_attribute(
ami_id, valid_atrributes_list[attribute_name] ami_id, valid_atrributes_list[attribute_name]
) )
template = self.response_template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE) template = self.response_template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE)
return template.render( return template.render(
ami_id=ami_id, ami_id=ami_id,
users=users, launch_permissions=launch_permissions,
groups=groups,
attribute_name=attribute_name, attribute_name=attribute_name,
attribute_value=attribute_value, attribute_value=attribute_value,
) )
def modify_image_attribute(self) -> str: def modify_image_attribute(self) -> str:
ami_id = self._get_param("ImageId") ami_id = self._get_param("ImageId")
launch_permissions_to_add = list(
self._get_params().get("LaunchPermission", {}).get("Add", {}).values()
)
launch_permissions_to_remove = list(
self._get_params().get("LaunchPermission", {}).get("Remove", {}).values()
)
# If only one OperationType is added, the other attributes are submitted as different variables
operation_type = self._get_param("OperationType") operation_type = self._get_param("OperationType")
group = self._get_param("UserGroup.1") if operation_type in ["add", "remove"]:
user_ids = self._get_multi_param("UserId") group = self._get_param("UserGroup.1")
lp = (
launch_permissions_to_add
if operation_type == "add"
else launch_permissions_to_remove
)
if group:
lp.append({"Group": group})
for user_id in self._get_multi_param("UserId"):
lp.append({"UserId": user_id})
org_arn = self._get_param("OrganizationArn.1")
if org_arn:
lp.append({"OrganizationArn": org_arn})
ou_arn = self._get_param("OrganizationalUnitArn.1")
if ou_arn:
lp.append({"OrganizationalUnitArn": ou_arn})
self.error_on_dryrun() self.error_on_dryrun()
if operation_type == "add": self.ec2_backend.modify_image_attribute(
self.ec2_backend.add_launch_permission( ami_id=ami_id,
ami_id, user_ids=user_ids, group=group launch_permissions_to_add=launch_permissions_to_add,
) launch_permissions_to_remove=launch_permissions_to_remove,
elif operation_type == "remove": )
self.ec2_backend.remove_launch_permission(
ami_id, user_ids=user_ids, group=group
)
return MODIFY_IMAGE_ATTRIBUTE_RESPONSE return MODIFY_IMAGE_ATTRIBUTE_RESPONSE
def register_image(self) -> str: def register_image(self) -> str:
@ -232,17 +247,13 @@ DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE = """
{% endfor %} {% endfor %}
{% endif %} {% endif %}
{% if attribute_name == 'launchPermission' %} {% if attribute_name == 'launchPermission' %}
{% if groups %} {% if launch_permissions %}
{% for group in groups %} {% for lp in launch_permissions %}
<item> <item>
<group>{{ group }}</group> {% if lp['UserId'] %}<userId>{{ lp['UserId'] }}</userId>{% endif %}
</item> {% if lp['Group'] %}<group>{{ lp['Group'] }}</group>{% endif %}
{% endfor %} {% if lp['OrganizationArn'] %}<organizationArn>{{ lp['OrganizationArn'] }}</organizationArn>{% endif %}
{% endif %} {% if lp['OrganizationalUnitArn'] %}<organizationalUnitArn>{{ lp['OrganizationalUnitArn'] }}</organizationalUnitArn>{% endif %}
{% if users %}
{% for user in users %}
<item>
<userId>{{ user }}</userId>
</item> </item>
{% endfor %} {% endfor %}
{% endif %} {% endif %}

View File

@ -426,6 +426,9 @@ def test_ami_filters():
] ]
imageB = boto3.resource("ec2", region_name="us-east-1").Image(imageB_id) imageB = boto3.resource("ec2", region_name="us-east-1").Image(imageB_id)
imageB.modify_attribute(LaunchPermission={"Add": [{"Group": "all"}]}) imageB.modify_attribute(LaunchPermission={"Add": [{"Group": "all"}]})
assert imageB.describe_attribute(Attribute="launchPermission")[
"LaunchPermissions"
] == [{"Group": "all"}]
amis_by_architecture = ec2.describe_images( amis_by_architecture = ec2.describe_images(
Filters=[{"Name": "architecture", "Values": ["x86_64"]}] Filters=[{"Name": "architecture", "Values": ["x86_64"]}]
@ -479,7 +482,6 @@ def test_ami_filters():
Filters=[{"Name": "is-public", "Values": ["false"]}] Filters=[{"Name": "is-public", "Values": ["false"]}]
)["Images"] )["Images"]
assert imageA.id in [ami["ImageId"] for ami in amis_by_nonpublic] assert imageA.id in [ami["ImageId"] for ami in amis_by_nonpublic]
assert len(amis_by_nonpublic) >= 2, "Should have at least 2 non-public images"
@mock_aws @mock_aws
@ -684,6 +686,53 @@ def test_ami_attribute_user_permissions():
image.modify_attribute(**REMOVE_USERS_ARGS) image.modify_attribute(**REMOVE_USERS_ARGS)
@mock_aws
def test_ami_attribute_organizations():
ec2 = boto3.client("ec2", region_name="us-east-1")
reservation = ec2.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
instance_id = reservation["Instances"][0]["InstanceId"]
image_id = ec2.create_image(InstanceId=instance_id, Name="test-ami-A")["ImageId"]
image = boto3.resource("ec2", "us-east-1").Image(image_id)
arn = "someOrganizationArn"
image.modify_attribute(
Attribute="launchPermission",
OperationType="add",
OrganizationArns=[arn],
)
image.modify_attribute(
Attribute="launchPermission",
OperationType="add",
OrganizationalUnitArns=["ou1"],
)
ec2.modify_image_attribute(
Attribute="launchPermission",
ImageId=image_id,
LaunchPermission={
"Add": [
{"UserId": "111122223333"},
{"UserId": "555566667777"},
{"Group": "all"},
{"OrganizationArn": "orgarn"},
{"OrganizationalUnitArn": "ou2"},
]
},
)
launch_permissions = image.describe_attribute(Attribute="launchPermission")[
"LaunchPermissions"
]
assert launch_permissions == [
{"OrganizationArn": "someOrganizationArn"},
{"OrganizationalUnitArn": "ou1"},
{"UserId": "111122223333"},
{"UserId": "555566667777"},
{"Group": "all"},
{"OrganizationArn": "orgarn"},
{"OrganizationalUnitArn": "ou2"},
]
@mock_aws @mock_aws
def test_ami_describe_executable_users(): def test_ami_describe_executable_users():
conn = boto3.client("ec2", region_name="us-east-1") conn = boto3.client("ec2", region_name="us-east-1")