diff --git a/moto/ec2/exceptions.py b/moto/ec2/exceptions.py index fce22c1bb..6a93607d0 100644 --- a/moto/ec2/exceptions.py +++ b/moto/ec2/exceptions.py @@ -735,3 +735,15 @@ class UnknownVpcEndpointService(EC2ClientError): "InvalidVpcEndpointServiceId.NotFound", f"The VpcEndpointService Id '{service_id}' does not exist", ) + + +class AuthFailureRestricted(RESTError): + """Replicate real world issue https://github.com/aws/aws-cli/issues/1083""" + + code = 401 + + def __init__(self) -> None: + super().__init__( + "AuthFailure", + "Unauthorized attempt to access restricted resource", + ) diff --git a/moto/ec2/models/amis.py b/moto/ec2/models/amis.py index 8579d3c65..c1b041b72 100644 --- a/moto/ec2/models/amis.py +++ b/moto/ec2/models/amis.py @@ -1,13 +1,13 @@ import json import re from os import environ -from typing import Any, Dict, List, Iterable, Optional, Set, cast +from typing import Any, Dict, List, Optional, Set, cast from moto.utilities.utils import load_resource from ..exceptions import ( InvalidAMIIdError, InvalidAMIAttributeItemValueError, - MalformedAMIIdError, InvalidTaggableResourceType, + MalformedAMIIdError, UnvailableAMIIdError, ) from .core import TaggedEC2Resource @@ -27,7 +27,7 @@ else: class Ami(TaggedEC2Resource): - def __init__( + def __init__( # pylint: disable=dangerous-default-value self, ec2_backend: Any, ami_id: str, @@ -51,6 +51,8 @@ class Ami(TaggedEC2Resource): sriov: str = "simple", region_name: str = "us-east-1a", snapshot_description: Optional[str] = None, + product_codes: Set[str] = set(), + boot_mode: str = "uefi", ): self.ec2_backend = ec2_backend self.id = ami_id @@ -70,6 +72,8 @@ class Ami(TaggedEC2Resource): self.root_device_type = root_device_type self.sriov = sriov self.creation_date = creation_date or utc_date_and_time() + self.product_codes = product_codes + self.boot_mode = boot_mode if instance: self.instance = instance @@ -296,14 +300,6 @@ class AmiBackend: else: raise InvalidAMIIdError(ami_id) - def get_launch_permission_groups(self, ami_id: str) -> Iterable[str]: - ami = self.describe_images(ami_ids=[ami_id])[0] - return ami.launch_permission_groups - - def get_launch_permission_users(self, ami_id: str) -> Iterable[str]: - ami = self.describe_images(ami_ids=[ami_id])[0] - return ami.launch_permission_users - def validate_permission_targets( self, user_ids: Optional[List[str]] = None, group: Optional[str] = None ) -> None: @@ -367,3 +363,6 @@ class AmiBackend: if group: ami.launch_permission_groups.discard(group) + + def describe_image_attribute(self, ami_id: str, attribute_name: str) -> Any: + return self.amis[ami_id].__getattribute__(attribute_name) diff --git a/moto/ec2/resources/amis.json b/moto/ec2/resources/amis.json index 749409e91..57c5f51f5 100644 --- a/moto/ec2/resources/amis.json +++ b/moto/ec2/resources/amis.json @@ -729,5 +729,23 @@ "root_device_type": "ebs", "sriov": "simple", "virtualization_type": "hvm" + }, + { + "architecture": "x86_64", + "ami_id": "ami-0b301ce3ce3475r4f", + "image_location": "amazon/al2022-ami-2022.0.20220728.1-kernel-6.0-x86_64", + "image_type": "machine", + "public": true, + "owner_id": "137112412989", + "platform": "Linux/UNIX", + "state": "available", + "description": "Test ami for product codes", + "hypervisor": "xen", + "name": "product_codes_test", + "root_device_name": "/dev/xvda", + "root_device_type": "ebs", + "sriov": "simple", + "virtualization_type": "hvm", + "product_codes": [ "code123", "code456" ] } ] \ No newline at end of file diff --git a/moto/ec2/responses/amis.py b/moto/ec2/responses/amis.py index a950131ef..77ef8584c 100644 --- a/moto/ec2/responses/amis.py +++ b/moto/ec2/responses/amis.py @@ -1,4 +1,5 @@ from ._base_response import EC2BaseResponse +from ..exceptions import AuthFailureRestricted, InvalidRequest class AmisResponse(EC2BaseResponse): @@ -56,10 +57,56 @@ class AmisResponse(EC2BaseResponse): def describe_image_attribute(self) -> str: ami_id = self._get_param("ImageId") - groups = self.ec2_backend.get_launch_permission_groups(ami_id) - users = self.ec2_backend.get_launch_permission_users(ami_id) + attribute_name = self._get_param("Attribute") + + # only valid attributes as per + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/describe_image_attribute.html + valid_atrributes_list = { + "description": "description", + "kernel": "kernel_id", + "ramdisk": "ramdisk", + "launchPermission": { + "groups": "launch_permission_groups", + "users": "launch_permission_users", + }, + "productCodes": "product_codes", + "blockDeviceMapping": "bdm", + "sriovNetSupport": "sriov", + "bootMode": "boot_mode", + "tpmSupport": "tmp", + "uefiData": "uefi", + "lastLaunchedTime": "lld", + "imdsSupport": "imds", + } + if attribute_name not in valid_atrributes_list: + raise InvalidRequest + elif attribute_name == "blockDeviceMapping": + # replicate real aws behaviour and throw and error + # https://github.com/aws/aws-cli/issues/1083 + raise AuthFailureRestricted + + groups = None + users = None + attribute_value = None + if attribute_name == "launchPermission": + groups = self.ec2_backend.describe_image_attribute( + ami_id, valid_atrributes_list[attribute_name]["groups"] # type: ignore[index] + ) + users = self.ec2_backend.describe_image_attribute( + ami_id, valid_atrributes_list[attribute_name]["users"] # type: ignore[index] + ) + else: + attribute_value = self.ec2_backend.describe_image_attribute( + ami_id, valid_atrributes_list[attribute_name] + ) template = self.response_template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE) - return template.render(ami_id=ami_id, groups=groups, users=users) + return template.render( + ami_id=ami_id, + users=users, + groups=groups, + attribute_name=attribute_name, + attribute_value=attribute_value, + ) def modify_image_attribute(self) -> str: ami_id = self._get_param("ImageId") @@ -175,10 +222,16 @@ DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE = """ 59dbff89-35bd-4eac-99ed-be587EXAMPLE {{ ami_id }} - {% if not groups and not users %} - - {% else %} - + <{{ attribute_name }}> + {% if attribute_name == 'productCodes' %} + {% for value in attribute_value %} + + {{ value }} + marketplace + + {% endfor %} + {% endif %} + {% if attribute_name == 'launchPermission' %} {% if groups %} {% for group in groups %} @@ -193,8 +246,11 @@ DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE = """ {% endfor %} {% endif %} - - {% endif %} + {% endif %} + {% if attribute_name not in ['launchPermission', 'productCodes'] %} + {{ attribute_value }} + {% endif %} + """ MODIFY_IMAGE_ATTRIBUTE_RESPONSE = """ diff --git a/tests/test_ec2/test_amis.py b/tests/test_ec2/test_amis.py index dbfd2562d..24c2c28e9 100644 --- a/tests/test_ec2/test_amis.py +++ b/tests/test_ec2/test_amis.py @@ -686,7 +686,7 @@ def test_ami_describe_executable_users(): conn.modify_image_attribute(**ADD_USER_ARGS) attributes = conn.describe_image_attribute( - ImageId=image_id, Attribute="LaunchPermissions", DryRun=False + ImageId=image_id, Attribute="launchPermission", DryRun=False ) attributes["LaunchPermissions"].should.have.length_of(1) attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1) @@ -721,7 +721,7 @@ def test_ami_describe_executable_users_negative(): conn.modify_image_attribute(**ADD_USER_ARGS) attributes = conn.describe_image_attribute( - ImageId=image_id, Attribute="LaunchPermissions", DryRun=False + ImageId=image_id, Attribute="launchPermission", DryRun=False ) attributes["LaunchPermissions"].should.have.length_of(1) attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1) @@ -755,7 +755,7 @@ def test_ami_describe_executable_users_and_filter(): conn.modify_image_attribute(**ADD_USER_ARGS) attributes = conn.describe_image_attribute( - ImageId=image_id, Attribute="LaunchPermissions", DryRun=False + ImageId=image_id, Attribute="launchPermission", DryRun=False ) attributes["LaunchPermissions"].should.have.length_of(1) attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1) @@ -1230,3 +1230,98 @@ def test_delete_snapshot_from_create_image(): with pytest.raises(ClientError) as exc: ec2_client.describe_snapshots(SnapshotIds=[snapshot_id]) exc.value.response["Error"]["Code"].should.equal("InvalidSnapshot.NotFound") + + +@mock_ec2 +def test_ami_describe_image_attribute_product_codes(): + # Setup + conn = boto3.client("ec2", region_name="us-east-1") + + # test ami loaded from moto/ec2/resources/ami.json + test_image = conn.describe_images( + Filters=[{"Name": "name", "Values": ["product_codes_test"]}] + ) + image_id = test_image["Images"][0]["ImageId"] + expected_codes = [ + {"ProductCodeId": "code123", "ProductCodeType": "marketplace"}, + {"ProductCodeId": "code456", "ProductCodeType": "marketplace"}, + ] + # Execute + attributes = conn.describe_image_attribute( + ImageId=image_id, Attribute="productCodes", DryRun=False + ) + + # Verify + assert "ProductCodes" in attributes + assert len(attributes["ProductCodes"]) == 2 + assert attributes["ProductCodes"] == expected_codes + + +@mock_ec2 +def test_ami_describe_image_attribute(): + # Setup + conn = boto3.client("ec2", region_name="us-east-1") + + # test ami loaded from moto/ec2/resources/ami.json + test_image = conn.describe_images( + Filters=[{"Name": "name", "Values": ["product_codes_test"]}] + ) + image_id = test_image["Images"][0]["ImageId"] + + # Execute + description = conn.describe_image_attribute( + ImageId=image_id, Attribute="description", DryRun=False + ) + boot_mode = conn.describe_image_attribute( + ImageId=image_id, Attribute="bootMode", DryRun=False + ) + sriov = conn.describe_image_attribute( + ImageId=image_id, Attribute="sriovNetSupport", DryRun=False + ) + + # Verify + assert "Description" in description + assert description["Description"]["Value"] == "Test ami for product codes" + assert "BootMode" in boot_mode + assert boot_mode["BootMode"]["Value"] == "uefi" + assert "SriovNetSupport" in sriov + assert sriov["SriovNetSupport"]["Value"] == "simple" + + +@mock_ec2 +def test_ami_describe_image_attribute_block_device_fail(): + # Setup + conn = boto3.client("ec2", region_name="us-east-1") + test_image = conn.describe_images() + image_id = test_image["Images"][0]["ImageId"] + + # Execute + with pytest.raises(ClientError) as e: + conn.describe_image_attribute( + ImageId=image_id, Attribute="blockDeviceMapping", DryRun=False + ) + + # Verify + assert e.value.response["Error"]["Code"] == "AuthFailure" + assert ( + e.value.response["Error"]["Message"] + == "Unauthorized attempt to access restricted resource" + ) + + +@mock_ec2 +def test_ami_describe_image_attribute_invalid_param(): + # Setup + conn = boto3.client("ec2", region_name="us-east-1") + test_image = conn.describe_images() + image_id = test_image["Images"][0]["ImageId"] + + # Execute + with pytest.raises(ClientError) as e: + conn.describe_image_attribute( + ImageId=image_id, Attribute="invalid", DryRun=False + ) + + # Verify + assert e.value.response["Error"]["Code"] == "InvalidRequest" + assert e.value.response["Error"]["Message"] == "The request received was invalid"