Fix ami attributes (#6280)

* fix: describe_image attributes

* refactor: unifi attribute logic, add tests

* chore: mypy

* chore: linting

* chore: unused method

* chore: increase coverage
This commit is contained in:
rafcio19 2023-05-04 10:16:22 +01:00 committed by GitHub
parent c21c44136e
commit 763b582cf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 203 additions and 23 deletions

View File

@ -735,3 +735,15 @@ class UnknownVpcEndpointService(EC2ClientError):
"InvalidVpcEndpointServiceId.NotFound", "InvalidVpcEndpointServiceId.NotFound",
f"The VpcEndpointService Id '{service_id}' does not exist", f"The VpcEndpointService Id '{service_id}' does not exist",
) )
class AuthFailureRestricted(RESTError):
"""Replicate real world issue https://github.com/aws/aws-cli/issues/1083"""
code = 401
def __init__(self) -> None:
super().__init__(
"AuthFailure",
"Unauthorized attempt to access restricted resource",
)

View File

@ -1,13 +1,13 @@
import json import json
import re import re
from os import environ from os import environ
from typing import Any, Dict, List, Iterable, Optional, Set, cast from typing import Any, Dict, List, Optional, Set, cast
from moto.utilities.utils import load_resource from moto.utilities.utils import load_resource
from ..exceptions import ( from ..exceptions import (
InvalidAMIIdError, InvalidAMIIdError,
InvalidAMIAttributeItemValueError, InvalidAMIAttributeItemValueError,
MalformedAMIIdError,
InvalidTaggableResourceType, InvalidTaggableResourceType,
MalformedAMIIdError,
UnvailableAMIIdError, UnvailableAMIIdError,
) )
from .core import TaggedEC2Resource from .core import TaggedEC2Resource
@ -27,7 +27,7 @@ else:
class Ami(TaggedEC2Resource): class Ami(TaggedEC2Resource):
def __init__( def __init__( # pylint: disable=dangerous-default-value
self, self,
ec2_backend: Any, ec2_backend: Any,
ami_id: str, ami_id: str,
@ -51,6 +51,8 @@ class Ami(TaggedEC2Resource):
sriov: str = "simple", sriov: str = "simple",
region_name: str = "us-east-1a", region_name: str = "us-east-1a",
snapshot_description: Optional[str] = None, snapshot_description: Optional[str] = None,
product_codes: Set[str] = set(),
boot_mode: str = "uefi",
): ):
self.ec2_backend = ec2_backend self.ec2_backend = ec2_backend
self.id = ami_id self.id = ami_id
@ -70,6 +72,8 @@ class Ami(TaggedEC2Resource):
self.root_device_type = root_device_type self.root_device_type = root_device_type
self.sriov = sriov self.sriov = sriov
self.creation_date = creation_date or utc_date_and_time() self.creation_date = creation_date or utc_date_and_time()
self.product_codes = product_codes
self.boot_mode = boot_mode
if instance: if instance:
self.instance = instance self.instance = instance
@ -296,14 +300,6 @@ class AmiBackend:
else: else:
raise InvalidAMIIdError(ami_id) raise InvalidAMIIdError(ami_id)
def get_launch_permission_groups(self, ami_id: str) -> Iterable[str]:
ami = self.describe_images(ami_ids=[ami_id])[0]
return ami.launch_permission_groups
def get_launch_permission_users(self, ami_id: str) -> Iterable[str]:
ami = self.describe_images(ami_ids=[ami_id])[0]
return ami.launch_permission_users
def validate_permission_targets( def validate_permission_targets(
self, user_ids: Optional[List[str]] = None, group: Optional[str] = None self, user_ids: Optional[List[str]] = None, group: Optional[str] = None
) -> None: ) -> None:
@ -367,3 +363,6 @@ class AmiBackend:
if group: if group:
ami.launch_permission_groups.discard(group) ami.launch_permission_groups.discard(group)
def describe_image_attribute(self, ami_id: str, attribute_name: str) -> Any:
return self.amis[ami_id].__getattribute__(attribute_name)

View File

@ -729,5 +729,23 @@
"root_device_type": "ebs", "root_device_type": "ebs",
"sriov": "simple", "sriov": "simple",
"virtualization_type": "hvm" "virtualization_type": "hvm"
},
{
"architecture": "x86_64",
"ami_id": "ami-0b301ce3ce3475r4f",
"image_location": "amazon/al2022-ami-2022.0.20220728.1-kernel-6.0-x86_64",
"image_type": "machine",
"public": true,
"owner_id": "137112412989",
"platform": "Linux/UNIX",
"state": "available",
"description": "Test ami for product codes",
"hypervisor": "xen",
"name": "product_codes_test",
"root_device_name": "/dev/xvda",
"root_device_type": "ebs",
"sriov": "simple",
"virtualization_type": "hvm",
"product_codes": [ "code123", "code456" ]
} }
] ]

View File

@ -1,4 +1,5 @@
from ._base_response import EC2BaseResponse from ._base_response import EC2BaseResponse
from ..exceptions import AuthFailureRestricted, InvalidRequest
class AmisResponse(EC2BaseResponse): class AmisResponse(EC2BaseResponse):
@ -56,10 +57,56 @@ class AmisResponse(EC2BaseResponse):
def describe_image_attribute(self) -> str: def describe_image_attribute(self) -> str:
ami_id = self._get_param("ImageId") ami_id = self._get_param("ImageId")
groups = self.ec2_backend.get_launch_permission_groups(ami_id) attribute_name = self._get_param("Attribute")
users = self.ec2_backend.get_launch_permission_users(ami_id)
# only valid attributes as per
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/describe_image_attribute.html
valid_atrributes_list = {
"description": "description",
"kernel": "kernel_id",
"ramdisk": "ramdisk",
"launchPermission": {
"groups": "launch_permission_groups",
"users": "launch_permission_users",
},
"productCodes": "product_codes",
"blockDeviceMapping": "bdm",
"sriovNetSupport": "sriov",
"bootMode": "boot_mode",
"tpmSupport": "tmp",
"uefiData": "uefi",
"lastLaunchedTime": "lld",
"imdsSupport": "imds",
}
if attribute_name not in valid_atrributes_list:
raise InvalidRequest
elif attribute_name == "blockDeviceMapping":
# replicate real aws behaviour and throw and error
# https://github.com/aws/aws-cli/issues/1083
raise AuthFailureRestricted
groups = None
users = None
attribute_value = None
if attribute_name == "launchPermission":
groups = self.ec2_backend.describe_image_attribute(
ami_id, valid_atrributes_list[attribute_name]["groups"] # type: ignore[index]
)
users = self.ec2_backend.describe_image_attribute(
ami_id, valid_atrributes_list[attribute_name]["users"] # type: ignore[index]
)
else:
attribute_value = self.ec2_backend.describe_image_attribute(
ami_id, valid_atrributes_list[attribute_name]
)
template = self.response_template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE) template = self.response_template(DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE)
return template.render(ami_id=ami_id, groups=groups, users=users) return template.render(
ami_id=ami_id,
users=users,
groups=groups,
attribute_name=attribute_name,
attribute_value=attribute_value,
)
def modify_image_attribute(self) -> str: def modify_image_attribute(self) -> str:
ami_id = self._get_param("ImageId") ami_id = self._get_param("ImageId")
@ -175,10 +222,16 @@ DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE = """
<DescribeImageAttributeResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/"> <DescribeImageAttributeResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId> <requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
<imageId>{{ ami_id }}</imageId> <imageId>{{ ami_id }}</imageId>
{% if not groups and not users %} <{{ attribute_name }}>
<launchPermission/> {% if attribute_name == 'productCodes' %}
{% else %} {% for value in attribute_value %}
<launchPermission> <item>
<productCode>{{ value }}</productCode>
<type>marketplace</type>
</item>
{% endfor %}
{% endif %}
{% if attribute_name == 'launchPermission' %}
{% if groups %} {% if groups %}
{% for group in groups %} {% for group in groups %}
<item> <item>
@ -193,8 +246,11 @@ DESCRIBE_IMAGE_ATTRIBUTES_RESPONSE = """
</item> </item>
{% endfor %} {% endfor %}
{% endif %} {% endif %}
</launchPermission> {% endif %}
{% endif %} {% if attribute_name not in ['launchPermission', 'productCodes'] %}
<value>{{ attribute_value }}</value>
{% endif %}
</{{ attribute_name }}>
</DescribeImageAttributeResponse>""" </DescribeImageAttributeResponse>"""
MODIFY_IMAGE_ATTRIBUTE_RESPONSE = """ MODIFY_IMAGE_ATTRIBUTE_RESPONSE = """

View File

@ -686,7 +686,7 @@ def test_ami_describe_executable_users():
conn.modify_image_attribute(**ADD_USER_ARGS) conn.modify_image_attribute(**ADD_USER_ARGS)
attributes = conn.describe_image_attribute( attributes = conn.describe_image_attribute(
ImageId=image_id, Attribute="LaunchPermissions", DryRun=False ImageId=image_id, Attribute="launchPermission", DryRun=False
) )
attributes["LaunchPermissions"].should.have.length_of(1) attributes["LaunchPermissions"].should.have.length_of(1)
attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1) attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1)
@ -721,7 +721,7 @@ def test_ami_describe_executable_users_negative():
conn.modify_image_attribute(**ADD_USER_ARGS) conn.modify_image_attribute(**ADD_USER_ARGS)
attributes = conn.describe_image_attribute( attributes = conn.describe_image_attribute(
ImageId=image_id, Attribute="LaunchPermissions", DryRun=False ImageId=image_id, Attribute="launchPermission", DryRun=False
) )
attributes["LaunchPermissions"].should.have.length_of(1) attributes["LaunchPermissions"].should.have.length_of(1)
attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1) attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1)
@ -755,7 +755,7 @@ def test_ami_describe_executable_users_and_filter():
conn.modify_image_attribute(**ADD_USER_ARGS) conn.modify_image_attribute(**ADD_USER_ARGS)
attributes = conn.describe_image_attribute( attributes = conn.describe_image_attribute(
ImageId=image_id, Attribute="LaunchPermissions", DryRun=False ImageId=image_id, Attribute="launchPermission", DryRun=False
) )
attributes["LaunchPermissions"].should.have.length_of(1) attributes["LaunchPermissions"].should.have.length_of(1)
attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1) attributes["LaunchPermissions"][0]["UserId"].should.equal(USER1)
@ -1230,3 +1230,98 @@ def test_delete_snapshot_from_create_image():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
ec2_client.describe_snapshots(SnapshotIds=[snapshot_id]) ec2_client.describe_snapshots(SnapshotIds=[snapshot_id])
exc.value.response["Error"]["Code"].should.equal("InvalidSnapshot.NotFound") exc.value.response["Error"]["Code"].should.equal("InvalidSnapshot.NotFound")
@mock_ec2
def test_ami_describe_image_attribute_product_codes():
# Setup
conn = boto3.client("ec2", region_name="us-east-1")
# test ami loaded from moto/ec2/resources/ami.json
test_image = conn.describe_images(
Filters=[{"Name": "name", "Values": ["product_codes_test"]}]
)
image_id = test_image["Images"][0]["ImageId"]
expected_codes = [
{"ProductCodeId": "code123", "ProductCodeType": "marketplace"},
{"ProductCodeId": "code456", "ProductCodeType": "marketplace"},
]
# Execute
attributes = conn.describe_image_attribute(
ImageId=image_id, Attribute="productCodes", DryRun=False
)
# Verify
assert "ProductCodes" in attributes
assert len(attributes["ProductCodes"]) == 2
assert attributes["ProductCodes"] == expected_codes
@mock_ec2
def test_ami_describe_image_attribute():
# Setup
conn = boto3.client("ec2", region_name="us-east-1")
# test ami loaded from moto/ec2/resources/ami.json
test_image = conn.describe_images(
Filters=[{"Name": "name", "Values": ["product_codes_test"]}]
)
image_id = test_image["Images"][0]["ImageId"]
# Execute
description = conn.describe_image_attribute(
ImageId=image_id, Attribute="description", DryRun=False
)
boot_mode = conn.describe_image_attribute(
ImageId=image_id, Attribute="bootMode", DryRun=False
)
sriov = conn.describe_image_attribute(
ImageId=image_id, Attribute="sriovNetSupport", DryRun=False
)
# Verify
assert "Description" in description
assert description["Description"]["Value"] == "Test ami for product codes"
assert "BootMode" in boot_mode
assert boot_mode["BootMode"]["Value"] == "uefi"
assert "SriovNetSupport" in sriov
assert sriov["SriovNetSupport"]["Value"] == "simple"
@mock_ec2
def test_ami_describe_image_attribute_block_device_fail():
# Setup
conn = boto3.client("ec2", region_name="us-east-1")
test_image = conn.describe_images()
image_id = test_image["Images"][0]["ImageId"]
# Execute
with pytest.raises(ClientError) as e:
conn.describe_image_attribute(
ImageId=image_id, Attribute="blockDeviceMapping", DryRun=False
)
# Verify
assert e.value.response["Error"]["Code"] == "AuthFailure"
assert (
e.value.response["Error"]["Message"]
== "Unauthorized attempt to access restricted resource"
)
@mock_ec2
def test_ami_describe_image_attribute_invalid_param():
# Setup
conn = boto3.client("ec2", region_name="us-east-1")
test_image = conn.describe_images()
image_id = test_image["Images"][0]["ImageId"]
# Execute
with pytest.raises(ClientError) as e:
conn.describe_image_attribute(
ImageId=image_id, Attribute="invalid", DryRun=False
)
# Verify
assert e.value.response["Error"]["Code"] == "InvalidRequest"
assert e.value.response["Error"]["Message"] == "The request received was invalid"