EC2: Simplify DryRun-handling (#5926)

This commit is contained in:
Bert Blommers 2023-02-14 12:43:28 -01:00 committed by GitHub
parent c50737d027
commit 78840fd71c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 810 additions and 725 deletions

View File

@ -838,14 +838,10 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return "JSON" in self.querystring.get("ContentType", []) return "JSON" in self.querystring.get("ContentType", [])
def error_on_dryrun(self) -> None: def error_on_dryrun(self) -> None:
self.is_not_dryrun()
def is_not_dryrun(self, action: Optional[str] = None) -> bool:
if "true" in self.querystring.get("DryRun", ["false"]): if "true" in self.querystring.get("DryRun", ["false"]):
a = action or self._get_param("Action") a = self._get_param("Action")
message = f"An error occurred (DryRunOperation) when calling the {a} operation: Request would have succeeded, but DryRun flag is set" message = f"An error occurred (DryRunOperation) when calling the {a} operation: Request would have succeeded, but DryRun flag is set"
raise DryRunClientError(error_type="DryRunOperation", message=message) raise DryRunClientError(error_type="DryRunOperation", message=message)
return True
class _RecursiveDictRef(object): class _RecursiveDictRef(object):

View File

@ -7,34 +7,40 @@ class AmisResponse(EC2BaseResponse):
description = self._get_param("Description", if_none="") description = self._get_param("Description", if_none="")
instance_id = self._get_param("InstanceId") instance_id = self._get_param("InstanceId")
tag_specifications = self._get_multi_param("TagSpecification") tag_specifications = self._get_multi_param("TagSpecification")
if self.is_not_dryrun("CreateImage"):
image = self.ec2_backend.create_image( self.error_on_dryrun()
instance_id,
name, image = self.ec2_backend.create_image(
description, instance_id,
tag_specifications=tag_specifications, name,
) description,
template = self.response_template(CREATE_IMAGE_RESPONSE) tag_specifications=tag_specifications,
return template.render(image=image) )
template = self.response_template(CREATE_IMAGE_RESPONSE)
return template.render(image=image)
def copy_image(self): def copy_image(self):
source_image_id = self._get_param("SourceImageId") source_image_id = self._get_param("SourceImageId")
source_region = self._get_param("SourceRegion") source_region = self._get_param("SourceRegion")
name = self._get_param("Name") name = self._get_param("Name")
description = self._get_param("Description") description = self._get_param("Description")
if self.is_not_dryrun("CopyImage"):
image = self.ec2_backend.copy_image( self.error_on_dryrun()
source_image_id, source_region, name, description
) image = self.ec2_backend.copy_image(
template = self.response_template(COPY_IMAGE_RESPONSE) source_image_id, source_region, name, description
return template.render(image=image) )
template = self.response_template(COPY_IMAGE_RESPONSE)
return template.render(image=image)
def deregister_image(self): def deregister_image(self):
ami_id = self._get_param("ImageId") ami_id = self._get_param("ImageId")
if self.is_not_dryrun("DeregisterImage"):
self.ec2_backend.deregister_image(ami_id) self.error_on_dryrun()
template = self.response_template(DEREGISTER_IMAGE_RESPONSE)
return template.render(success="true") self.ec2_backend.deregister_image(ami_id)
template = self.response_template(DEREGISTER_IMAGE_RESPONSE)
return template.render(success="true")
def describe_images(self): def describe_images(self):
self.error_on_dryrun() self.error_on_dryrun()
@ -60,30 +66,33 @@ class AmisResponse(EC2BaseResponse):
operation_type = self._get_param("OperationType") operation_type = self._get_param("OperationType")
group = self._get_param("UserGroup.1") group = self._get_param("UserGroup.1")
user_ids = self._get_multi_param("UserId") user_ids = self._get_multi_param("UserId")
if self.is_not_dryrun("ModifyImageAttribute"):
if operation_type == "add": self.error_on_dryrun()
self.ec2_backend.add_launch_permission(
ami_id, user_ids=user_ids, group=group if operation_type == "add":
) self.ec2_backend.add_launch_permission(
elif operation_type == "remove": ami_id, user_ids=user_ids, group=group
self.ec2_backend.remove_launch_permission( )
ami_id, user_ids=user_ids, group=group elif operation_type == "remove":
) self.ec2_backend.remove_launch_permission(
return MODIFY_IMAGE_ATTRIBUTE_RESPONSE ami_id, user_ids=user_ids, group=group
)
return MODIFY_IMAGE_ATTRIBUTE_RESPONSE
def register_image(self): def register_image(self):
name = self.querystring.get("Name")[0] name = self.querystring.get("Name")[0]
description = self._get_param("Description", if_none="") description = self._get_param("Description", if_none="")
if self.is_not_dryrun("RegisterImage"):
image = self.ec2_backend.register_image(name, description) self.error_on_dryrun()
template = self.response_template(REGISTER_IMAGE_RESPONSE)
return template.render(image=image) image = self.ec2_backend.register_image(name, description)
template = self.response_template(REGISTER_IMAGE_RESPONSE)
return template.render(image=image)
def reset_image_attribute(self): def reset_image_attribute(self):
if self.is_not_dryrun("ResetImageAttribute"): self.error_on_dryrun()
raise NotImplementedError(
"AMIs.reset_image_attribute is not yet implemented" raise NotImplementedError("AMIs.reset_image_attribute is not yet implemented")
)
CREATE_IMAGE_RESPONSE = """<CreateImageResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/"> CREATE_IMAGE_RESPONSE = """<CreateImageResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -6,12 +6,12 @@ class ElasticBlockStore(EC2BaseResponse):
volume_id = self._get_param("VolumeId") volume_id = self._get_param("VolumeId")
instance_id = self._get_param("InstanceId") instance_id = self._get_param("InstanceId")
device_path = self._get_param("Device") device_path = self._get_param("Device")
if self.is_not_dryrun("AttachVolume"):
attachment = self.ec2_backend.attach_volume( self.error_on_dryrun()
volume_id, instance_id, device_path
) attachment = self.ec2_backend.attach_volume(volume_id, instance_id, device_path)
template = self.response_template(ATTACHED_VOLUME_RESPONSE) template = self.response_template(ATTACHED_VOLUME_RESPONSE)
return template.render(attachment=attachment) return template.render(attachment=attachment)
def copy_snapshot(self): def copy_snapshot(self):
source_snapshot_id = self._get_param("SourceSnapshotId") source_snapshot_id = self._get_param("SourceSnapshotId")
@ -19,24 +19,28 @@ class ElasticBlockStore(EC2BaseResponse):
description = self._get_param("Description") description = self._get_param("Description")
tags = self._parse_tag_specification() tags = self._parse_tag_specification()
snapshot_tags = tags.get("snapshot", {}) snapshot_tags = tags.get("snapshot", {})
if self.is_not_dryrun("CopySnapshot"):
snapshot = self.ec2_backend.copy_snapshot( self.error_on_dryrun()
source_snapshot_id, source_region, description
) snapshot = self.ec2_backend.copy_snapshot(
snapshot.add_tags(snapshot_tags) source_snapshot_id, source_region, description
template = self.response_template(COPY_SNAPSHOT_RESPONSE) )
return template.render(snapshot=snapshot) snapshot.add_tags(snapshot_tags)
template = self.response_template(COPY_SNAPSHOT_RESPONSE)
return template.render(snapshot=snapshot)
def create_snapshot(self): def create_snapshot(self):
volume_id = self._get_param("VolumeId") volume_id = self._get_param("VolumeId")
description = self._get_param("Description") description = self._get_param("Description")
tags = self._parse_tag_specification() tags = self._parse_tag_specification()
snapshot_tags = tags.get("snapshot", {}) snapshot_tags = tags.get("snapshot", {})
if self.is_not_dryrun("CreateSnapshot"):
snapshot = self.ec2_backend.create_snapshot(volume_id, description) self.error_on_dryrun()
snapshot.add_tags(snapshot_tags)
template = self.response_template(CREATE_SNAPSHOT_RESPONSE) snapshot = self.ec2_backend.create_snapshot(volume_id, description)
return template.render(snapshot=snapshot) snapshot.add_tags(snapshot_tags)
template = self.response_template(CREATE_SNAPSHOT_RESPONSE)
return template.render(snapshot=snapshot)
def create_snapshots(self): def create_snapshots(self):
params = self._get_params() params = self._get_params()
@ -45,12 +49,13 @@ class ElasticBlockStore(EC2BaseResponse):
tags = self._parse_tag_specification() tags = self._parse_tag_specification()
snapshot_tags = tags.get("snapshot", {}) snapshot_tags = tags.get("snapshot", {})
if self.is_not_dryrun("CreateSnapshots"): self.error_on_dryrun()
snapshots = self.ec2_backend.create_snapshots(
instance_spec, description, snapshot_tags snapshots = self.ec2_backend.create_snapshots(
) instance_spec, description, snapshot_tags
template = self.response_template(CREATE_SNAPSHOTS_RESPONSE) )
return template.render(snapshots=snapshots) template = self.response_template(CREATE_SNAPSHOTS_RESPONSE)
return template.render(snapshots=snapshots)
def create_volume(self): def create_volume(self):
size = self._get_param("Size") size = self._get_param("Size")
@ -62,31 +67,34 @@ class ElasticBlockStore(EC2BaseResponse):
encrypted = self._get_bool_param("Encrypted", if_none=False) encrypted = self._get_bool_param("Encrypted", if_none=False)
kms_key_id = self._get_param("KmsKeyId") kms_key_id = self._get_param("KmsKeyId")
iops = self._get_param("Iops") iops = self._get_param("Iops")
if self.is_not_dryrun("CreateVolume"):
volume = self.ec2_backend.create_volume( self.error_on_dryrun()
size=size,
zone_name=zone, volume = self.ec2_backend.create_volume(
snapshot_id=snapshot_id, size=size,
encrypted=encrypted, zone_name=zone,
kms_key_id=kms_key_id, snapshot_id=snapshot_id,
volume_type=volume_type, encrypted=encrypted,
iops=iops, kms_key_id=kms_key_id,
) volume_type=volume_type,
volume.add_tags(volume_tags) iops=iops,
template = self.response_template(CREATE_VOLUME_RESPONSE) )
return template.render(volume=volume) volume.add_tags(volume_tags)
template = self.response_template(CREATE_VOLUME_RESPONSE)
return template.render(volume=volume)
def modify_volume(self): def modify_volume(self):
volume_id = self._get_param("VolumeId") volume_id = self._get_param("VolumeId")
target_size = self._get_param("Size") target_size = self._get_param("Size")
target_volume_type = self._get_param("VolumeType") target_volume_type = self._get_param("VolumeType")
if self.is_not_dryrun("ModifyVolume"): self.error_on_dryrun()
volume = self.ec2_backend.modify_volume(
volume_id, target_size, target_volume_type volume = self.ec2_backend.modify_volume(
) volume_id, target_size, target_volume_type
template = self.response_template(MODIFY_VOLUME_RESPONSE) )
return template.render(volume=volume) template = self.response_template(MODIFY_VOLUME_RESPONSE)
return template.render(volume=volume)
def describe_volumes_modifications(self): def describe_volumes_modifications(self):
filters = self._filters_from_querystring() filters = self._filters_from_querystring()
@ -99,15 +107,19 @@ class ElasticBlockStore(EC2BaseResponse):
def delete_snapshot(self): def delete_snapshot(self):
snapshot_id = self._get_param("SnapshotId") snapshot_id = self._get_param("SnapshotId")
if self.is_not_dryrun("DeleteSnapshot"):
self.ec2_backend.delete_snapshot(snapshot_id) self.error_on_dryrun()
return DELETE_SNAPSHOT_RESPONSE
self.ec2_backend.delete_snapshot(snapshot_id)
return DELETE_SNAPSHOT_RESPONSE
def delete_volume(self): def delete_volume(self):
volume_id = self._get_param("VolumeId") volume_id = self._get_param("VolumeId")
if self.is_not_dryrun("DeleteVolume"):
self.ec2_backend.delete_volume(volume_id) self.error_on_dryrun()
return DELETE_VOLUME_RESPONSE
self.ec2_backend.delete_volume(volume_id)
return DELETE_VOLUME_RESPONSE
def describe_snapshots(self): def describe_snapshots(self):
filters = self._filters_from_querystring() filters = self._filters_from_querystring()
@ -141,24 +153,26 @@ class ElasticBlockStore(EC2BaseResponse):
volume_id = self._get_param("VolumeId") volume_id = self._get_param("VolumeId")
instance_id = self._get_param("InstanceId") instance_id = self._get_param("InstanceId")
device_path = self._get_param("Device") device_path = self._get_param("Device")
if self.is_not_dryrun("DetachVolume"):
attachment = self.ec2_backend.detach_volume( self.error_on_dryrun()
volume_id, instance_id, device_path
) attachment = self.ec2_backend.detach_volume(volume_id, instance_id, device_path)
template = self.response_template(DETATCH_VOLUME_RESPONSE) template = self.response_template(DETATCH_VOLUME_RESPONSE)
return template.render(attachment=attachment) return template.render(attachment=attachment)
def enable_volume_io(self): def enable_volume_io(self):
if self.is_not_dryrun("EnableVolumeIO"): self.error_on_dryrun()
raise NotImplementedError(
"ElasticBlockStore.enable_volume_io is not yet implemented" raise NotImplementedError(
) "ElasticBlockStore.enable_volume_io is not yet implemented"
)
def import_volume(self): def import_volume(self):
if self.is_not_dryrun("ImportVolume"): self.error_on_dryrun()
raise NotImplementedError(
"ElasticBlockStore.import_volume is not yet implemented" raise NotImplementedError(
) "ElasticBlockStore.import_volume is not yet implemented"
)
def describe_snapshot_attribute(self): def describe_snapshot_attribute(self):
snapshot_id = self._get_param("SnapshotId") snapshot_id = self._get_param("SnapshotId")
@ -172,28 +186,32 @@ class ElasticBlockStore(EC2BaseResponse):
operation_type = self._get_param("OperationType") operation_type = self._get_param("OperationType")
groups = self._get_multi_param("UserGroup") groups = self._get_multi_param("UserGroup")
user_ids = self._get_multi_param("UserId") user_ids = self._get_multi_param("UserId")
if self.is_not_dryrun("ModifySnapshotAttribute"):
if operation_type == "add": self.error_on_dryrun()
self.ec2_backend.add_create_volume_permission(
snapshot_id, user_ids=user_ids, groups=groups if operation_type == "add":
) self.ec2_backend.add_create_volume_permission(
elif operation_type == "remove": snapshot_id, user_ids=user_ids, groups=groups
self.ec2_backend.remove_create_volume_permission( )
snapshot_id, user_ids=user_ids, groups=groups elif operation_type == "remove":
) self.ec2_backend.remove_create_volume_permission(
return MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE snapshot_id, user_ids=user_ids, groups=groups
)
return MODIFY_SNAPSHOT_ATTRIBUTE_RESPONSE
def modify_volume_attribute(self): def modify_volume_attribute(self):
if self.is_not_dryrun("ModifyVolumeAttribute"): self.error_on_dryrun()
raise NotImplementedError(
"ElasticBlockStore.modify_volume_attribute is not yet implemented" raise NotImplementedError(
) "ElasticBlockStore.modify_volume_attribute is not yet implemented"
)
def reset_snapshot_attribute(self): def reset_snapshot_attribute(self):
if self.is_not_dryrun("ResetSnapshotAttribute"): self.error_on_dryrun()
raise NotImplementedError(
"ElasticBlockStore.reset_snapshot_attribute is not yet implemented" raise NotImplementedError(
) "ElasticBlockStore.reset_snapshot_attribute is not yet implemented"
)
CREATE_VOLUME_RESPONSE = """<CreateVolumeResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/"> CREATE_VOLUME_RESPONSE = """<CreateVolumeResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -9,15 +9,16 @@ class ElasticIPAddresses(EC2BaseResponse):
tags = self._get_multi_param("TagSpecification") tags = self._get_multi_param("TagSpecification")
tags = add_tag_specification(tags) tags = add_tag_specification(tags)
if self.is_not_dryrun("AllocateAddress"): self.error_on_dryrun()
if reallocate_address:
address = self.ec2_backend.allocate_address( if reallocate_address:
domain, address=reallocate_address, tags=tags address = self.ec2_backend.allocate_address(
) domain, address=reallocate_address, tags=tags
else: )
address = self.ec2_backend.allocate_address(domain, tags=tags) else:
template = self.response_template(ALLOCATE_ADDRESS_RESPONSE) address = self.ec2_backend.allocate_address(domain, tags=tags)
return template.render(address=address) template = self.response_template(ALLOCATE_ADDRESS_RESPONSE)
return template.render(address=address)
def associate_address(self): def associate_address(self):
instance = eni = None instance = eni = None
@ -38,35 +39,36 @@ class ElasticIPAddresses(EC2BaseResponse):
if "AllowReassociation" in self.querystring: if "AllowReassociation" in self.querystring:
reassociate = self._get_param("AllowReassociation") == "true" reassociate = self._get_param("AllowReassociation") == "true"
if self.is_not_dryrun("AssociateAddress"): self.error_on_dryrun()
if instance or eni:
if "PublicIp" in self.querystring: if instance or eni:
eip = self.ec2_backend.associate_address( if "PublicIp" in self.querystring:
instance=instance, eip = self.ec2_backend.associate_address(
eni=eni, instance=instance,
address=self._get_param("PublicIp"), eni=eni,
reassociate=reassociate, address=self._get_param("PublicIp"),
) reassociate=reassociate,
elif "AllocationId" in self.querystring: )
eip = self.ec2_backend.associate_address( elif "AllocationId" in self.querystring:
instance=instance, eip = self.ec2_backend.associate_address(
eni=eni, instance=instance,
allocation_id=self._get_param("AllocationId"), eni=eni,
reassociate=reassociate, allocation_id=self._get_param("AllocationId"),
) reassociate=reassociate,
else: )
self.ec2_backend.raise_error(
"MissingParameter",
"Invalid request, expect PublicIp/AllocationId parameter.",
)
else: else:
self.ec2_backend.raise_error( self.ec2_backend.raise_error(
"MissingParameter", "MissingParameter",
"Invalid request, expect either instance or ENI.", "Invalid request, expect PublicIp/AllocationId parameter.",
) )
else:
self.ec2_backend.raise_error(
"MissingParameter",
"Invalid request, expect either instance or ENI.",
)
template = self.response_template(ASSOCIATE_ADDRESS_RESPONSE) template = self.response_template(ASSOCIATE_ADDRESS_RESPONSE)
return template.render(address=eip) return template.render(address=eip)
def describe_addresses(self): def describe_addresses(self):
self.error_on_dryrun() self.error_on_dryrun()
@ -80,38 +82,46 @@ class ElasticIPAddresses(EC2BaseResponse):
return template.render(addresses=addresses) return template.render(addresses=addresses)
def disassociate_address(self): def disassociate_address(self):
if self.is_not_dryrun("DisAssociateAddress"): if (
if "PublicIp" in self.querystring: "PublicIp" not in self.querystring
self.ec2_backend.disassociate_address( and "AssociationId" not in self.querystring
address=self._get_param("PublicIp") ):
) self.ec2_backend.raise_error(
elif "AssociationId" in self.querystring: "MissingParameter",
self.ec2_backend.disassociate_address( "Invalid request, expect PublicIp/AssociationId parameter.",
association_id=self._get_param("AssociationId") )
)
else:
self.ec2_backend.raise_error(
"MissingParameter",
"Invalid request, expect PublicIp/AssociationId parameter.",
)
return self.response_template(DISASSOCIATE_ADDRESS_RESPONSE).render() self.error_on_dryrun()
if "PublicIp" in self.querystring:
self.ec2_backend.disassociate_address(address=self._get_param("PublicIp"))
elif "AssociationId" in self.querystring:
self.ec2_backend.disassociate_address(
association_id=self._get_param("AssociationId")
)
return self.response_template(DISASSOCIATE_ADDRESS_RESPONSE).render()
def release_address(self): def release_address(self):
if self.is_not_dryrun("ReleaseAddress"): if (
if "PublicIp" in self.querystring: "PublicIp" not in self.querystring
self.ec2_backend.release_address(address=self._get_param("PublicIp")) and "AllocationId" not in self.querystring
elif "AllocationId" in self.querystring: ):
self.ec2_backend.release_address( self.ec2_backend.raise_error(
allocation_id=self._get_param("AllocationId") "MissingParameter",
) "Invalid request, expect PublicIp/AllocationId parameter.",
else: )
self.ec2_backend.raise_error(
"MissingParameter",
"Invalid request, expect PublicIp/AllocationId parameter.",
)
return self.response_template(RELEASE_ADDRESS_RESPONSE).render() self.error_on_dryrun()
if "PublicIp" in self.querystring:
self.ec2_backend.release_address(address=self._get_param("PublicIp"))
elif "AllocationId" in self.querystring:
self.ec2_backend.release_address(
allocation_id=self._get_param("AllocationId")
)
return self.response_template(RELEASE_ADDRESS_RESPONSE).render()
ALLOCATE_ADDRESS_RESPONSE = """<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/"> ALLOCATE_ADDRESS_RESPONSE = """<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -17,96 +17,110 @@ class ElasticNetworkInterfaces(EC2BaseResponse):
tags = self._get_multi_param("TagSpecification") tags = self._get_multi_param("TagSpecification")
tags = add_tag_specification(tags) tags = add_tag_specification(tags)
if self.is_not_dryrun("CreateNetworkInterface"): self.error_on_dryrun()
eni = self.ec2_backend.create_network_interface(
subnet, eni = self.ec2_backend.create_network_interface(
private_ip_address, subnet,
private_ip_addresses, private_ip_address,
groups, private_ip_addresses,
description, groups,
tags, description,
secondary_ips_count=secondary_ips_count, tags,
ipv6_addresses=ipv6_addresses, secondary_ips_count=secondary_ips_count,
ipv6_address_count=ipv6_address_count, ipv6_addresses=ipv6_addresses,
) ipv6_address_count=ipv6_address_count,
template = self.response_template(CREATE_NETWORK_INTERFACE_RESPONSE) )
return template.render(eni=eni) template = self.response_template(CREATE_NETWORK_INTERFACE_RESPONSE)
return template.render(eni=eni)
def delete_network_interface(self): def delete_network_interface(self):
eni_id = self._get_param("NetworkInterfaceId") eni_id = self._get_param("NetworkInterfaceId")
if self.is_not_dryrun("DeleteNetworkInterface"):
self.ec2_backend.delete_network_interface(eni_id) self.error_on_dryrun()
template = self.response_template(DELETE_NETWORK_INTERFACE_RESPONSE)
return template.render() self.ec2_backend.delete_network_interface(eni_id)
template = self.response_template(DELETE_NETWORK_INTERFACE_RESPONSE)
return template.render()
def describe_network_interface_attribute(self): def describe_network_interface_attribute(self):
eni_id = self._get_param("NetworkInterfaceId") eni_id = self._get_param("NetworkInterfaceId")
attribute = self._get_param("Attribute") attribute = self._get_param("Attribute")
if self.is_not_dryrun("DescribeNetworkInterfaceAttribute"):
eni = self.ec2_backend.get_all_network_interfaces([eni_id])[0]
if attribute == "description": self.error_on_dryrun()
template = self.response_template(
DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_DESCRIPTION eni = self.ec2_backend.get_all_network_interfaces([eni_id])[0]
)
elif attribute == "groupSet": if attribute == "description":
template = self.response_template( template = self.response_template(
DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_GROUPSET DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_DESCRIPTION
) )
elif attribute == "sourceDestCheck": elif attribute == "groupSet":
template = self.response_template( template = self.response_template(
DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_SOURCEDESTCHECK DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_GROUPSET
) )
elif attribute == "attachment": elif attribute == "sourceDestCheck":
template = self.response_template( template = self.response_template(
DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_ATTACHMENT DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_SOURCEDESTCHECK
) )
else: elif attribute == "attachment":
raise InvalidParameterValueErrorUnknownAttribute(attribute) template = self.response_template(
DESCRIBE_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE_ATTACHMENT
)
else:
raise InvalidParameterValueErrorUnknownAttribute(attribute)
return template.render(eni=eni) return template.render(eni=eni)
def describe_network_interfaces(self): def describe_network_interfaces(self):
eni_ids = self._get_multi_param("NetworkInterfaceId") eni_ids = self._get_multi_param("NetworkInterfaceId")
filters = self._filters_from_querystring() filters = self._filters_from_querystring()
if self.is_not_dryrun("DescribeNetworkInterfaces"):
enis = self.ec2_backend.get_all_network_interfaces(eni_ids, filters) self.error_on_dryrun()
template = self.response_template(DESCRIBE_NETWORK_INTERFACES_RESPONSE)
return template.render(enis=enis) enis = self.ec2_backend.get_all_network_interfaces(eni_ids, filters)
template = self.response_template(DESCRIBE_NETWORK_INTERFACES_RESPONSE)
return template.render(enis=enis)
def attach_network_interface(self): def attach_network_interface(self):
eni_id = self._get_param("NetworkInterfaceId") eni_id = self._get_param("NetworkInterfaceId")
instance_id = self._get_param("InstanceId") instance_id = self._get_param("InstanceId")
device_index = self._get_param("DeviceIndex") device_index = self._get_param("DeviceIndex")
if self.is_not_dryrun("AttachNetworkInterface"):
attachment_id = self.ec2_backend.attach_network_interface( self.error_on_dryrun()
eni_id, instance_id, device_index
) attachment_id = self.ec2_backend.attach_network_interface(
template = self.response_template(ATTACH_NETWORK_INTERFACE_RESPONSE) eni_id, instance_id, device_index
return template.render(attachment_id=attachment_id) )
template = self.response_template(ATTACH_NETWORK_INTERFACE_RESPONSE)
return template.render(attachment_id=attachment_id)
def detach_network_interface(self): def detach_network_interface(self):
attachment_id = self._get_param("AttachmentId") attachment_id = self._get_param("AttachmentId")
if self.is_not_dryrun("DetachNetworkInterface"):
self.ec2_backend.detach_network_interface(attachment_id) self.error_on_dryrun()
template = self.response_template(DETACH_NETWORK_INTERFACE_RESPONSE)
return template.render() self.ec2_backend.detach_network_interface(attachment_id)
template = self.response_template(DETACH_NETWORK_INTERFACE_RESPONSE)
return template.render()
def modify_network_interface_attribute(self): def modify_network_interface_attribute(self):
eni_id = self._get_param("NetworkInterfaceId") eni_id = self._get_param("NetworkInterfaceId")
group_ids = self._get_multi_param("SecurityGroupId") group_ids = self._get_multi_param("SecurityGroupId")
source_dest_check = get_attribute_value("SourceDestCheck", self.querystring) source_dest_check = get_attribute_value("SourceDestCheck", self.querystring)
description = get_attribute_value("Description", self.querystring) description = get_attribute_value("Description", self.querystring)
if self.is_not_dryrun("ModifyNetworkInterface"):
self.ec2_backend.modify_network_interface_attribute( self.error_on_dryrun()
eni_id, group_ids, source_dest_check, description
) self.ec2_backend.modify_network_interface_attribute(
return MODIFY_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE eni_id, group_ids, source_dest_check, description
)
return MODIFY_NETWORK_INTERFACE_ATTRIBUTE_RESPONSE
def reset_network_interface_attribute(self): def reset_network_interface_attribute(self):
if self.is_not_dryrun("ResetNetworkInterface"): self.error_on_dryrun()
raise NotImplementedError(
"ElasticNetworkInterfaces(AmazonVPC).reset_network_interface_attribute is not yet implemented" raise NotImplementedError(
) "ElasticNetworkInterfaces(AmazonVPC).reset_network_interface_attribute is not yet implemented"
)
def assign_private_ip_addresses(self): def assign_private_ip_addresses(self):
eni_id = self._get_param("NetworkInterfaceId") eni_id = self._get_param("NetworkInterfaceId")

View File

@ -17,37 +17,42 @@ class FlowLogs(EC2BaseResponse):
tags = self._parse_tag_specification() tags = self._parse_tag_specification()
tags = tags.get("vpc-flow-log", {}) tags = tags.get("vpc-flow-log", {})
if self.is_not_dryrun("CreateFlowLogs"):
flow_logs, errors = self.ec2_backend.create_flow_logs( self.error_on_dryrun()
resource_type=resource_type,
resource_ids=resource_ids, flow_logs, errors = self.ec2_backend.create_flow_logs(
traffic_type=traffic_type, resource_type=resource_type,
deliver_logs_permission_arn=deliver_logs_permission_arn, resource_ids=resource_ids,
log_destination_type=log_destination_type, traffic_type=traffic_type,
log_destination=log_destination, deliver_logs_permission_arn=deliver_logs_permission_arn,
log_group_name=log_group_name, log_destination_type=log_destination_type,
log_format=log_format, log_destination=log_destination,
max_aggregation_interval=max_aggregation_interval, log_group_name=log_group_name,
) log_format=log_format,
for fl in flow_logs: max_aggregation_interval=max_aggregation_interval,
fl.add_tags(tags) )
template = self.response_template(CREATE_FLOW_LOGS_RESPONSE) for fl in flow_logs:
return template.render(flow_logs=flow_logs, errors=errors) fl.add_tags(tags)
template = self.response_template(CREATE_FLOW_LOGS_RESPONSE)
return template.render(flow_logs=flow_logs, errors=errors)
def describe_flow_logs(self): def describe_flow_logs(self):
flow_log_ids = self._get_multi_param("FlowLogId") flow_log_ids = self._get_multi_param("FlowLogId")
filters = self._filters_from_querystring() filters = self._filters_from_querystring()
flow_logs = self.ec2_backend.describe_flow_logs(flow_log_ids, filters) flow_logs = self.ec2_backend.describe_flow_logs(flow_log_ids, filters)
if self.is_not_dryrun("DescribeFlowLogs"):
template = self.response_template(DESCRIBE_FLOW_LOGS_RESPONSE) self.error_on_dryrun()
return template.render(flow_logs=flow_logs)
template = self.response_template(DESCRIBE_FLOW_LOGS_RESPONSE)
return template.render(flow_logs=flow_logs)
def delete_flow_logs(self): def delete_flow_logs(self):
flow_log_ids = self._get_multi_param("FlowLogId") flow_log_ids = self._get_multi_param("FlowLogId")
self.error_on_dryrun()
self.ec2_backend.delete_flow_logs(flow_log_ids) self.ec2_backend.delete_flow_logs(flow_log_ids)
if self.is_not_dryrun("DeleteFlowLogs"): return self.response_template(DELETE_FLOW_LOGS_RESPONSE).render()
template = self.response_template(DELETE_FLOW_LOGS_RESPONSE)
return template.render()
CREATE_FLOW_LOGS_RESPONSE = """ CREATE_FLOW_LOGS_RESPONSE = """

View File

@ -87,62 +87,70 @@ class InstanceResponse(EC2BaseResponse):
if mappings: if mappings:
kwargs["block_device_mappings"] = mappings kwargs["block_device_mappings"] = mappings
if self.is_not_dryrun("RunInstance"): self.error_on_dryrun()
new_reservation = self.ec2_backend.add_instances(
image_id, min_count, user_data, security_group_names, **kwargs
)
if kwargs.get("iam_instance_profile_name"):
self.ec2_backend.associate_iam_instance_profile(
instance_id=new_reservation.instances[0].id,
iam_instance_profile_name=kwargs.get("iam_instance_profile_name"),
)
if kwargs.get("iam_instance_profile_arn"):
self.ec2_backend.associate_iam_instance_profile(
instance_id=new_reservation.instances[0].id,
iam_instance_profile_arn=kwargs.get("iam_instance_profile_arn"),
)
template = self.response_template(EC2_RUN_INSTANCES) new_reservation = self.ec2_backend.add_instances(
return template.render( image_id, min_count, user_data, security_group_names, **kwargs
account_id=self.current_account, reservation=new_reservation )
if kwargs.get("iam_instance_profile_name"):
self.ec2_backend.associate_iam_instance_profile(
instance_id=new_reservation.instances[0].id,
iam_instance_profile_name=kwargs.get("iam_instance_profile_name"),
) )
if kwargs.get("iam_instance_profile_arn"):
self.ec2_backend.associate_iam_instance_profile(
instance_id=new_reservation.instances[0].id,
iam_instance_profile_arn=kwargs.get("iam_instance_profile_arn"),
)
template = self.response_template(EC2_RUN_INSTANCES)
return template.render(
account_id=self.current_account, reservation=new_reservation
)
def terminate_instances(self): def terminate_instances(self):
instance_ids = self._get_multi_param("InstanceId") instance_ids = self._get_multi_param("InstanceId")
if self.is_not_dryrun("TerminateInstance"):
instances = self.ec2_backend.terminate_instances(instance_ids)
from moto.autoscaling import autoscaling_backends
from moto.elbv2 import elbv2_backends
autoscaling_backends[self.current_account][ self.error_on_dryrun()
self.region
].notify_terminate_instances(instance_ids) instances = self.ec2_backend.terminate_instances(instance_ids)
elbv2_backends[self.current_account][ from moto.autoscaling import autoscaling_backends
self.region from moto.elbv2 import elbv2_backends
].notify_terminate_instances(instance_ids)
template = self.response_template(EC2_TERMINATE_INSTANCES) autoscaling_backends[self.current_account][
return template.render(instances=instances) self.region
].notify_terminate_instances(instance_ids)
elbv2_backends[self.current_account][self.region].notify_terminate_instances(
instance_ids
)
template = self.response_template(EC2_TERMINATE_INSTANCES)
return template.render(instances=instances)
def reboot_instances(self): def reboot_instances(self):
instance_ids = self._get_multi_param("InstanceId") instance_ids = self._get_multi_param("InstanceId")
if self.is_not_dryrun("RebootInstance"):
instances = self.ec2_backend.reboot_instances(instance_ids) self.error_on_dryrun()
template = self.response_template(EC2_REBOOT_INSTANCES)
return template.render(instances=instances) instances = self.ec2_backend.reboot_instances(instance_ids)
template = self.response_template(EC2_REBOOT_INSTANCES)
return template.render(instances=instances)
def stop_instances(self): def stop_instances(self):
instance_ids = self._get_multi_param("InstanceId") instance_ids = self._get_multi_param("InstanceId")
if self.is_not_dryrun("StopInstance"):
instances = self.ec2_backend.stop_instances(instance_ids) self.error_on_dryrun()
template = self.response_template(EC2_STOP_INSTANCES)
return template.render(instances=instances) instances = self.ec2_backend.stop_instances(instance_ids)
template = self.response_template(EC2_STOP_INSTANCES)
return template.render(instances=instances)
def start_instances(self): def start_instances(self):
instance_ids = self._get_multi_param("InstanceId") instance_ids = self._get_multi_param("InstanceId")
if self.is_not_dryrun("StartInstance"): self.error_on_dryrun()
instances = self.ec2_backend.start_instances(instance_ids)
template = self.response_template(EC2_START_INSTANCES) instances = self.ec2_backend.start_instances(instance_ids)
return template.render(instances=instances) template = self.response_template(EC2_START_INSTANCES)
return template.render(instances=instances)
def _get_list_of_dict_params(self, param_prefix, _dct): def _get_list_of_dict_params(self, param_prefix, _dct):
""" """
@ -268,9 +276,10 @@ class InstanceResponse(EC2BaseResponse):
instance_id = self._get_param("InstanceId") instance_id = self._get_param("InstanceId")
instance = self.ec2_backend.get_instance(instance_id) instance = self.ec2_backend.get_instance(instance_id)
if self.is_not_dryrun("ModifyInstanceAttribute"): self.error_on_dryrun()
block_device_type = instance.block_device_mapping[device_name_value]
block_device_type.delete_on_termination = del_on_term_value block_device_type = instance.block_device_mapping[device_name_value]
block_device_type.delete_on_termination = del_on_term_value
# +1 for the next device # +1 for the next device
mapping_counter += 1 mapping_counter += 1
@ -288,14 +297,15 @@ class InstanceResponse(EC2BaseResponse):
if not attribute_key: if not attribute_key:
return return
if self.is_not_dryrun("Modify" + attribute_key.split(".")[0]): self.error_on_dryrun()
value = self.querystring.get(attribute_key)[0]
normalized_attribute = camelcase_to_underscores(attribute_key.split(".")[0]) value = self.querystring.get(attribute_key)[0]
instance_id = self._get_param("InstanceId") normalized_attribute = camelcase_to_underscores(attribute_key.split(".")[0])
self.ec2_backend.modify_instance_attribute( instance_id = self._get_param("InstanceId")
instance_id, normalized_attribute, value self.ec2_backend.modify_instance_attribute(
) instance_id, normalized_attribute, value
return EC2_MODIFY_INSTANCE_ATTRIBUTE )
return EC2_MODIFY_INSTANCE_ATTRIBUTE
def _attribute_value_handler(self): def _attribute_value_handler(self):
attribute_key = self._get_param("Attribute") attribute_key = self._get_param("Attribute")
@ -303,14 +313,15 @@ class InstanceResponse(EC2BaseResponse):
if attribute_key is None: if attribute_key is None:
return return
if self.is_not_dryrun("ModifyInstanceAttribute"): self.error_on_dryrun()
value = self._get_param("Value")
normalized_attribute = camelcase_to_underscores(attribute_key) value = self._get_param("Value")
instance_id = self._get_param("InstanceId") normalized_attribute = camelcase_to_underscores(attribute_key)
self.ec2_backend.modify_instance_attribute( instance_id = self._get_param("InstanceId")
instance_id, normalized_attribute, value self.ec2_backend.modify_instance_attribute(
) instance_id, normalized_attribute, value
return EC2_MODIFY_INSTANCE_ATTRIBUTE )
return EC2_MODIFY_INSTANCE_ATTRIBUTE
def _security_grp_instance_attribute_handler(self): def _security_grp_instance_attribute_handler(self):
new_security_grp_list = [] new_security_grp_list = []
@ -319,11 +330,12 @@ class InstanceResponse(EC2BaseResponse):
new_security_grp_list.append(self.querystring.get(key)[0]) new_security_grp_list.append(self.querystring.get(key)[0])
instance_id = self._get_param("InstanceId") instance_id = self._get_param("InstanceId")
if self.is_not_dryrun("ModifyInstanceSecurityGroups"): self.error_on_dryrun()
self.ec2_backend.modify_instance_security_groups(
instance_id, new_security_grp_list self.ec2_backend.modify_instance_security_groups(
) instance_id, new_security_grp_list
return EC2_MODIFY_INSTANCE_ATTRIBUTE )
return EC2_MODIFY_INSTANCE_ATTRIBUTE
def _parse_block_device_mapping(self): def _parse_block_device_mapping(self):
device_mappings = self._get_list_prefix("BlockDeviceMapping") device_mappings = self._get_list_prefix("BlockDeviceMapping")

View File

@ -5,28 +5,29 @@ class InternetGateways(EC2BaseResponse):
def attach_internet_gateway(self): def attach_internet_gateway(self):
igw_id = self._get_param("InternetGatewayId") igw_id = self._get_param("InternetGatewayId")
vpc_id = self._get_param("VpcId") vpc_id = self._get_param("VpcId")
if self.is_not_dryrun("AttachInternetGateway"):
self.ec2_backend.attach_internet_gateway(igw_id, vpc_id) self.error_on_dryrun()
template = self.response_template(ATTACH_INTERNET_GATEWAY_RESPONSE)
return template.render() self.ec2_backend.attach_internet_gateway(igw_id, vpc_id)
return self.response_template(ATTACH_INTERNET_GATEWAY_RESPONSE).render()
def create_internet_gateway(self): def create_internet_gateway(self):
if self.is_not_dryrun("CreateInternetGateway"): self.error_on_dryrun()
tags = self._get_multi_param(
"TagSpecification", skip_result_conversion=True tags = self._get_multi_param("TagSpecification", skip_result_conversion=True)
) if tags:
if tags: tags = tags[0].get("Tag") or []
tags = tags[0].get("Tag") or [] igw = self.ec2_backend.create_internet_gateway(tags=tags)
igw = self.ec2_backend.create_internet_gateway(tags=tags) return self.response_template(CREATE_INTERNET_GATEWAY_RESPONSE).render(
template = self.response_template(CREATE_INTERNET_GATEWAY_RESPONSE) internet_gateway=igw
return template.render(internet_gateway=igw) )
def delete_internet_gateway(self): def delete_internet_gateway(self):
igw_id = self._get_param("InternetGatewayId") igw_id = self._get_param("InternetGatewayId")
if self.is_not_dryrun("DeleteInternetGateway"): self.error_on_dryrun()
self.ec2_backend.delete_internet_gateway(igw_id)
template = self.response_template(DELETE_INTERNET_GATEWAY_RESPONSE) self.ec2_backend.delete_internet_gateway(igw_id)
return template.render() return self.response_template(DELETE_INTERNET_GATEWAY_RESPONSE).render()
def describe_internet_gateways(self): def describe_internet_gateways(self):
filter_dict = self._filters_from_querystring() filter_dict = self._filters_from_querystring()
@ -46,10 +47,10 @@ class InternetGateways(EC2BaseResponse):
# raise else DependencyViolationError() # raise else DependencyViolationError()
igw_id = self._get_param("InternetGatewayId") igw_id = self._get_param("InternetGatewayId")
vpc_id = self._get_param("VpcId") vpc_id = self._get_param("VpcId")
if self.is_not_dryrun("DetachInternetGateway"): self.error_on_dryrun()
self.ec2_backend.detach_internet_gateway(igw_id, vpc_id)
template = self.response_template(DETACH_INTERNET_GATEWAY_RESPONSE) self.ec2_backend.detach_internet_gateway(igw_id, vpc_id)
return template.render() return self.response_template(DETACH_INTERNET_GATEWAY_RESPONSE).render()
ATTACH_INTERNET_GATEWAY_RESPONSE = """<AttachInternetGatewayResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/"> ATTACH_INTERNET_GATEWAY_RESPONSE = """<AttachInternetGatewayResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -3,13 +3,15 @@ from moto.core.responses import BaseResponse
class IPAddresses(BaseResponse): class IPAddresses(BaseResponse):
def assign_private_ip_addresses(self): def assign_private_ip_addresses(self):
if self.is_not_dryrun("AssignPrivateIPAddress"): self.error_on_dryrun()
raise NotImplementedError(
"IPAddresses.assign_private_ip_addresses is not yet implemented" raise NotImplementedError(
) "IPAddresses.assign_private_ip_addresses is not yet implemented"
)
def unassign_private_ip_addresses(self): def unassign_private_ip_addresses(self):
if self.is_not_dryrun("UnAssignPrivateIPAddress"): self.error_on_dryrun()
raise NotImplementedError(
"IPAddresses.unassign_private_ip_addresses is not yet implemented" raise NotImplementedError(
) "IPAddresses.unassign_private_ip_addresses is not yet implemented"
)

View File

@ -4,16 +4,17 @@ from ._base_response import EC2BaseResponse
class KeyPairs(EC2BaseResponse): class KeyPairs(EC2BaseResponse):
def create_key_pair(self): def create_key_pair(self):
name = self._get_param("KeyName") name = self._get_param("KeyName")
if self.is_not_dryrun("CreateKeyPair"): self.error_on_dryrun()
keypair = self.ec2_backend.create_key_pair(name)
template = self.response_template(CREATE_KEY_PAIR_RESPONSE) keypair = self.ec2_backend.create_key_pair(name)
return template.render(keypair=keypair) return self.response_template(CREATE_KEY_PAIR_RESPONSE).render(keypair=keypair)
def delete_key_pair(self): def delete_key_pair(self):
name = self._get_param("KeyName") name = self._get_param("KeyName")
if self.is_not_dryrun("DeleteKeyPair"): self.error_on_dryrun()
self.ec2_backend.delete_key_pair(name)
return self.response_template(DELETE_KEY_PAIR_RESPONSE).render() self.ec2_backend.delete_key_pair(name)
return self.response_template(DELETE_KEY_PAIR_RESPONSE).render()
def describe_key_pairs(self): def describe_key_pairs(self):
names = self._get_multi_param("KeyName") names = self._get_multi_param("KeyName")
@ -25,10 +26,10 @@ class KeyPairs(EC2BaseResponse):
def import_key_pair(self): def import_key_pair(self):
name = self._get_param("KeyName") name = self._get_param("KeyName")
material = self._get_param("PublicKeyMaterial") material = self._get_param("PublicKeyMaterial")
if self.is_not_dryrun("ImportKeyPair"): self.error_on_dryrun()
keypair = self.ec2_backend.import_key_pair(name, material)
template = self.response_template(IMPORT_KEYPAIR_RESPONSE) keypair = self.ec2_backend.import_key_pair(name, material)
return template.render(keypair=keypair) return self.response_template(IMPORT_KEYPAIR_RESPONSE).render(keypair=keypair)
DESCRIBE_KEY_PAIRS_RESPONSE = """<DescribeKeyPairsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/"> DESCRIBE_KEY_PAIRS_RESPONSE = """<DescribeKeyPairsResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -48,95 +48,53 @@ def pretty_xml(tree):
return parsed.toprettyxml(indent=" ") return parsed.toprettyxml(indent=" ")
def parse_object(raw_data):
out_data = {}
for key, value in raw_data.items():
key_fix_splits = key.split("_")
key_len = len(key_fix_splits)
new_key = ""
for i in range(0, key_len):
new_key += key_fix_splits[i][0].upper() + key_fix_splits[i][1:]
data = out_data
splits = new_key.split(".")
for split in splits[:-1]:
if split not in data:
data[split] = {}
data = data[split]
data[splits[-1]] = value
out_data = parse_lists(out_data)
return out_data
def parse_lists(data):
for key, value in data.items():
if isinstance(value, dict):
keys = data[key].keys()
is_list = all(map(lambda k: k.isnumeric(), keys))
if is_list:
new_value = []
keys = sorted(list(keys))
for k in keys:
lvalue = value[k]
if isinstance(lvalue, dict):
lvalue = parse_lists(lvalue)
new_value.append(lvalue)
data[key] = new_value
return data
class LaunchTemplates(EC2BaseResponse): class LaunchTemplates(EC2BaseResponse):
def create_launch_template(self): def create_launch_template(self):
name = self._get_param("LaunchTemplateName") name = self._get_param("LaunchTemplateName")
version_description = self._get_param("VersionDescription") version_description = self._get_param("VersionDescription")
tag_spec = self._parse_tag_specification() tag_spec = self._parse_tag_specification()
raw_template_data = self._get_dict_param("LaunchTemplateData.") parsed_template_data = self._get_multi_param_dict("LaunchTemplateData")
parsed_template_data = parse_object(raw_template_data)
if self.is_not_dryrun("CreateLaunchTemplate"): self.error_on_dryrun()
if tag_spec:
if "TagSpecifications" not in parsed_template_data:
parsed_template_data["TagSpecifications"] = []
converted_tag_spec = []
for resource_type, tags in tag_spec.items():
converted_tag_spec.append(
{
"ResourceType": resource_type,
"Tags": [
{"Key": key, "Value": value}
for key, value in tags.items()
],
}
)
parsed_template_data["TagSpecifications"].extend(converted_tag_spec) if tag_spec:
if "TagSpecifications" not in parsed_template_data:
parsed_template_data["TagSpecifications"] = []
converted_tag_spec = []
for resource_type, tags in tag_spec.items():
converted_tag_spec.append(
{
"ResourceType": resource_type,
"Tags": [
{"Key": key, "Value": value} for key, value in tags.items()
],
}
)
template = self.ec2_backend.create_launch_template( parsed_template_data["TagSpecifications"].extend(converted_tag_spec)
name, version_description, parsed_template_data, tag_spec
)
version = template.default_version()
tree = xml_root("CreateLaunchTemplateResponse") template = self.ec2_backend.create_launch_template(
xml_serialize( name, version_description, parsed_template_data, tag_spec
tree, )
"launchTemplate", version = template.default_version()
{
"createTime": version.create_time,
"createdBy": f"arn:aws:iam::{self.current_account}:root",
"defaultVersionNumber": template.default_version_number,
"latestVersionNumber": version.number,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"tags": template.tags,
},
)
return pretty_xml(tree) tree = xml_root("CreateLaunchTemplateResponse")
xml_serialize(
tree,
"launchTemplate",
{
"createTime": version.create_time,
"createdBy": f"arn:aws:iam::{self.current_account}:root",
"defaultVersionNumber": template.default_version_number,
"latestVersionNumber": version.number,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"tags": template.tags,
},
)
return pretty_xml(tree)
def create_launch_template_version(self): def create_launch_template_version(self):
name = self._get_param("LaunchTemplateName") name = self._get_param("LaunchTemplateName")
@ -148,51 +106,49 @@ class LaunchTemplates(EC2BaseResponse):
version_description = self._get_param("VersionDescription") version_description = self._get_param("VersionDescription")
raw_template_data = self._get_dict_param("LaunchTemplateData.") template_data = self._get_multi_param_dict("LaunchTemplateData")
template_data = parse_object(raw_template_data)
if self.is_not_dryrun("CreateLaunchTemplate"): self.error_on_dryrun()
version = template.create_version(template_data, version_description)
tree = xml_root("CreateLaunchTemplateVersionResponse") version = template.create_version(template_data, version_description)
xml_serialize(
tree, tree = xml_root("CreateLaunchTemplateVersionResponse")
"launchTemplateVersion", xml_serialize(
{ tree,
"createTime": version.create_time, "launchTemplateVersion",
"createdBy": f"arn:aws:iam::{self.current_account}:root", {
"defaultVersion": template.is_default(version), "createTime": version.create_time,
"launchTemplateData": version.data, "createdBy": f"arn:aws:iam::{self.current_account}:root",
"launchTemplateId": template.id, "defaultVersion": template.is_default(version),
"launchTemplateName": template.name, "launchTemplateData": version.data,
"versionDescription": version.description, "launchTemplateId": template.id,
"versionNumber": version.number, "launchTemplateName": template.name,
}, "versionDescription": version.description,
) "versionNumber": version.number,
return pretty_xml(tree) },
)
return pretty_xml(tree)
def delete_launch_template(self): def delete_launch_template(self):
name = self._get_param("LaunchTemplateName") name = self._get_param("LaunchTemplateName")
tid = self._get_param("LaunchTemplateId") tid = self._get_param("LaunchTemplateId")
if self.is_not_dryrun("DeleteLaunchTemplate"): self.error_on_dryrun()
template = self.ec2_backend.delete_launch_template(name, tid)
tree = xml_root("DeleteLaunchTemplatesResponse") template = self.ec2_backend.delete_launch_template(name, tid)
xml_serialize(
tree,
"launchTemplate",
{
"defaultVersionNumber": template.default_version_number,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
},
)
return pretty_xml(tree) tree = xml_root("DeleteLaunchTemplatesResponse")
xml_serialize(
tree,
"launchTemplate",
{
"defaultVersionNumber": template.default_version_number,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
},
)
# def delete_launch_template_versions(self): return pretty_xml(tree)
# pass
def describe_launch_template_versions(self): def describe_launch_template_versions(self):
name = self._get_param("LaunchTemplateName") name = self._get_param("LaunchTemplateName")
@ -213,57 +169,58 @@ class LaunchTemplates(EC2BaseResponse):
"all filters", "DescribeLaunchTemplateVersions" "all filters", "DescribeLaunchTemplateVersions"
) )
if self.is_not_dryrun("DescribeLaunchTemplateVersions"): self.error_on_dryrun()
tree = ElementTree.Element(
"DescribeLaunchTemplateVersionsResponse",
{"xmlns": "http://ec2.amazonaws.com/doc/2016-11-15/"},
)
request_id = ElementTree.SubElement(tree, "requestId")
request_id.text = "65cadec1-b364-4354-8ca8-4176dexample"
versions_node = ElementTree.SubElement(tree, "launchTemplateVersionSet") tree = ElementTree.Element(
"DescribeLaunchTemplateVersionsResponse",
{"xmlns": "http://ec2.amazonaws.com/doc/2016-11-15/"},
)
request_id = ElementTree.SubElement(tree, "requestId")
request_id.text = "65cadec1-b364-4354-8ca8-4176dexample"
ret_versions = [] versions_node = ElementTree.SubElement(tree, "launchTemplateVersionSet")
if versions:
for v in versions: ret_versions = []
if str(v).lower() == "$latest" or "$default": if versions:
tv = template.get_version(v) for v in versions:
else: if str(v).lower() == "$latest" or "$default":
tv = template.get_version(int(v)) tv = template.get_version(v)
ret_versions.append(tv)
elif min_version:
if max_version:
vMax = max_version
else: else:
vMax = min_version + max_results tv = template.get_version(int(v))
ret_versions.append(tv)
vMin = min_version - 1 elif min_version:
ret_versions = template.versions[vMin:vMax] if max_version:
elif max_version:
vMax = max_version vMax = max_version
ret_versions = template.versions[:vMax]
else: else:
ret_versions = template.versions vMax = min_version + max_results
ret_versions = ret_versions[:max_results] vMin = min_version - 1
ret_versions = template.versions[vMin:vMax]
elif max_version:
vMax = max_version
ret_versions = template.versions[:vMax]
else:
ret_versions = template.versions
for version in ret_versions: ret_versions = ret_versions[:max_results]
xml_serialize(
versions_node,
"item",
{
"createTime": version.create_time,
"createdBy": f"arn:aws:iam::{self.current_account}:root",
"defaultVersion": True,
"launchTemplateData": version.data,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"versionDescription": version.description,
"versionNumber": version.number,
},
)
return pretty_xml(tree) for version in ret_versions:
xml_serialize(
versions_node,
"item",
{
"createTime": version.create_time,
"createdBy": f"arn:aws:iam::{self.current_account}:root",
"defaultVersion": True,
"launchTemplateData": version.data,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"versionDescription": version.description,
"versionNumber": version.number,
},
)
return pretty_xml(tree)
def describe_launch_templates(self): def describe_launch_templates(self):
max_results = self._get_int_param("MaxResults", 15) max_results = self._get_int_param("MaxResults", 15)
@ -271,34 +228,32 @@ class LaunchTemplates(EC2BaseResponse):
template_ids = self._get_multi_param("LaunchTemplateId") template_ids = self._get_multi_param("LaunchTemplateId")
filters = self._filters_from_querystring() filters = self._filters_from_querystring()
if self.is_not_dryrun("DescribeLaunchTemplates"): self.error_on_dryrun()
tree = ElementTree.Element("DescribeLaunchTemplatesResponse")
templates_node = ElementTree.SubElement(tree, "launchTemplates")
templates = self.ec2_backend.describe_launch_templates( tree = ElementTree.Element("DescribeLaunchTemplatesResponse")
template_names=template_names, templates_node = ElementTree.SubElement(tree, "launchTemplates")
template_ids=template_ids,
filters=filters, templates = self.ec2_backend.describe_launch_templates(
template_names=template_names,
template_ids=template_ids,
filters=filters,
)
templates = templates[:max_results]
for template in templates:
xml_serialize(
templates_node,
"item",
{
"createTime": template.create_time,
"createdBy": f"arn:aws:iam::{self.current_account}:root",
"defaultVersionNumber": template.default_version_number,
"latestVersionNumber": template.latest_version_number,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"tags": template.tags,
},
) )
templates = templates[:max_results] return pretty_xml(tree)
for template in templates:
xml_serialize(
templates_node,
"item",
{
"createTime": template.create_time,
"createdBy": f"arn:aws:iam::{self.current_account}:root",
"defaultVersionNumber": template.default_version_number,
"latestVersionNumber": template.latest_version_number,
"launchTemplateId": template.id,
"launchTemplateName": template.name,
"tags": template.tags,
},
)
return pretty_xml(tree)
# def modify_launch_template(self):
# pass

View File

@ -3,13 +3,13 @@ from moto.core.responses import BaseResponse
class Monitoring(BaseResponse): class Monitoring(BaseResponse):
def monitor_instances(self): def monitor_instances(self):
if self.is_not_dryrun("MonitorInstances"): self.error_on_dryrun()
raise NotImplementedError(
"Monitoring.monitor_instances is not yet implemented" raise NotImplementedError("Monitoring.monitor_instances is not yet implemented")
)
def unmonitor_instances(self): def unmonitor_instances(self):
if self.is_not_dryrun("UnMonitorInstances"): self.error_on_dryrun()
raise NotImplementedError(
"Monitoring.unmonitor_instances is not yet implemented" raise NotImplementedError(
) "Monitoring.unmonitor_instances is not yet implemented"
)

View File

@ -3,16 +3,18 @@ from moto.core.responses import BaseResponse
class PlacementGroups(BaseResponse): class PlacementGroups(BaseResponse):
def create_placement_group(self): def create_placement_group(self):
if self.is_not_dryrun("CreatePlacementGroup"): self.error_on_dryrun()
raise NotImplementedError(
"PlacementGroups.create_placement_group is not yet implemented" raise NotImplementedError(
) "PlacementGroups.create_placement_group is not yet implemented"
)
def delete_placement_group(self): def delete_placement_group(self):
if self.is_not_dryrun("DeletePlacementGroup"): self.error_on_dryrun()
raise NotImplementedError(
"PlacementGroups.delete_placement_group is not yet implemented" raise NotImplementedError(
) "PlacementGroups.delete_placement_group is not yet implemented"
)
def describe_placement_groups(self): def describe_placement_groups(self):
raise NotImplementedError( raise NotImplementedError(

View File

@ -3,16 +3,18 @@ from moto.core.responses import BaseResponse
class ReservedInstances(BaseResponse): class ReservedInstances(BaseResponse):
def cancel_reserved_instances_listing(self): def cancel_reserved_instances_listing(self):
if self.is_not_dryrun("CancelReservedInstances"): self.error_on_dryrun()
raise NotImplementedError(
"ReservedInstances.cancel_reserved_instances_listing is not yet implemented" raise NotImplementedError(
) "ReservedInstances.cancel_reserved_instances_listing is not yet implemented"
)
def create_reserved_instances_listing(self): def create_reserved_instances_listing(self):
if self.is_not_dryrun("CreateReservedInstances"): self.error_on_dryrun()
raise NotImplementedError(
"ReservedInstances.create_reserved_instances_listing is not yet implemented" raise NotImplementedError(
) "ReservedInstances.create_reserved_instances_listing is not yet implemented"
)
def describe_reserved_instances(self): def describe_reserved_instances(self):
raise NotImplementedError( raise NotImplementedError(
@ -30,7 +32,8 @@ class ReservedInstances(BaseResponse):
) )
def purchase_reserved_instances_offering(self): def purchase_reserved_instances_offering(self):
if self.is_not_dryrun("PurchaseReservedInstances"): self.error_on_dryrun()
raise NotImplementedError(
"ReservedInstances.purchase_reserved_instances_offering is not yet implemented" raise NotImplementedError(
) "ReservedInstances.purchase_reserved_instances_offering is not yet implemented"
)

View File

@ -128,20 +128,22 @@ class SecurityGroups(EC2BaseResponse):
) )
def authorize_security_group_egress(self): def authorize_security_group_egress(self):
if self.is_not_dryrun("GrantSecurityGroupEgress"): self.error_on_dryrun()
for args in self._process_rules_from_querystring():
rule, group = self.ec2_backend.authorize_security_group_egress(*args) for args in self._process_rules_from_querystring():
self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy() rule, group = self.ec2_backend.authorize_security_group_egress(*args)
template = self.response_template(AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE) self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy()
return template.render(rule=rule, group=group) template = self.response_template(AUTHORIZE_SECURITY_GROUP_EGRESS_RESPONSE)
return template.render(rule=rule, group=group)
def authorize_security_group_ingress(self): def authorize_security_group_ingress(self):
if self.is_not_dryrun("GrantSecurityGroupIngress"): self.error_on_dryrun()
for args in self._process_rules_from_querystring():
rule, group = self.ec2_backend.authorize_security_group_ingress(*args) for args in self._process_rules_from_querystring():
self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy() rule, group = self.ec2_backend.authorize_security_group_ingress(*args)
template = self.response_template(AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE) self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy()
return template.render(rule=rule, group=group) template = self.response_template(AUTHORIZE_SECURITY_GROUP_INGRESS_RESPONSE)
return template.render(rule=rule, group=group)
def create_security_group(self): def create_security_group(self):
name = self._get_param("GroupName") name = self._get_param("GroupName")
@ -152,19 +154,16 @@ class SecurityGroups(EC2BaseResponse):
tags = (tags or {}).get("Tag", []) tags = (tags or {}).get("Tag", [])
tags = {t["Key"]: t["Value"] for t in tags} tags = {t["Key"]: t["Value"] for t in tags}
if self.is_not_dryrun("CreateSecurityGroup"): self.error_on_dryrun()
group = self.ec2_backend.create_security_group(
name, description, vpc_id=vpc_id, tags=tags group = self.ec2_backend.create_security_group(
) name, description, vpc_id=vpc_id, tags=tags
if group: )
self.ec2_backend.sg_old_ingress_ruls[ if group:
group.id self.ec2_backend.sg_old_ingress_ruls[group.id] = group.ingress_rules.copy()
] = group.ingress_rules.copy() self.ec2_backend.sg_old_egress_ruls[group.id] = group.egress_rules.copy()
self.ec2_backend.sg_old_egress_ruls[ template = self.response_template(CREATE_SECURITY_GROUP_RESPONSE)
group.id return template.render(group=group)
] = group.egress_rules.copy()
template = self.response_template(CREATE_SECURITY_GROUP_RESPONSE)
return template.render(group=group)
def delete_security_group(self): def delete_security_group(self):
# TODO this should raise an error if there are instances in the group. # TODO this should raise an error if there are instances in the group.
@ -174,13 +173,14 @@ class SecurityGroups(EC2BaseResponse):
name = self._get_param("GroupName") name = self._get_param("GroupName")
sg_id = self._get_param("GroupId") sg_id = self._get_param("GroupId")
if self.is_not_dryrun("DeleteSecurityGroup"): self.error_on_dryrun()
if name:
self.ec2_backend.delete_security_group(name)
elif sg_id:
self.ec2_backend.delete_security_group(group_id=sg_id)
return DELETE_GROUP_RESPONSE if name:
self.ec2_backend.delete_security_group(name)
elif sg_id:
self.ec2_backend.delete_security_group(group_id=sg_id)
return DELETE_GROUP_RESPONSE
def describe_security_groups(self): def describe_security_groups(self):
groupnames = self._get_multi_param("GroupName") groupnames = self._get_multi_param("GroupName")
@ -197,22 +197,26 @@ class SecurityGroups(EC2BaseResponse):
def describe_security_group_rules(self): def describe_security_group_rules(self):
group_id = self._get_param("GroupId") group_id = self._get_param("GroupId")
filters = self._get_param("Filter") filters = self._get_param("Filter")
if self.is_not_dryrun("DescribeSecurityGroups"):
rules = self.ec2_backend.describe_security_group_rules(group_id, filters) self.error_on_dryrun()
template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE)
return template.render(rules=rules) rules = self.ec2_backend.describe_security_group_rules(group_id, filters)
template = self.response_template(DESCRIBE_SECURITY_GROUP_RULES_RESPONSE)
return template.render(rules=rules)
def revoke_security_group_egress(self): def revoke_security_group_egress(self):
if self.is_not_dryrun("RevokeSecurityGroupEgress"): self.error_on_dryrun()
for args in self._process_rules_from_querystring():
self.ec2_backend.revoke_security_group_egress(*args) for args in self._process_rules_from_querystring():
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE self.ec2_backend.revoke_security_group_egress(*args)
return REVOKE_SECURITY_GROUP_EGRESS_RESPONSE
def revoke_security_group_ingress(self): def revoke_security_group_ingress(self):
if self.is_not_dryrun("RevokeSecurityGroupIngress"): self.error_on_dryrun()
for args in self._process_rules_from_querystring():
self.ec2_backend.revoke_security_group_ingress(*args) for args in self._process_rules_from_querystring():
return REVOKE_SECURITY_GROUP_INGRESS_RESPONSE self.ec2_backend.revoke_security_group_ingress(*args)
return REVOKE_SECURITY_GROUP_INGRESS_RESPONSE
def update_security_group_rule_descriptions_ingress(self): def update_security_group_rule_descriptions_ingress(self):
for args in self._process_rules_from_querystring(): for args in self._process_rules_from_querystring():

View File

@ -3,32 +3,29 @@ from moto.core.responses import BaseResponse
class Settings(BaseResponse): class Settings(BaseResponse):
def disable_ebs_encryption_by_default(self): def disable_ebs_encryption_by_default(self):
if self.is_not_dryrun("DisableEbsEncryptionByDefault"): self.error_on_dryrun()
self.ec2_backend.disable_ebs_encryption_by_default()
template = self.response_template( self.ec2_backend.disable_ebs_encryption_by_default()
DISABLE_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE template = self.response_template(DISABLE_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE)
) return template.render(ebsEncryptionByDefault=False).replace("False", "false")
return template.render(ebsEncryptionByDefault=False).replace(
"False", "false"
)
def enable_ebs_encryption_by_default(self): def enable_ebs_encryption_by_default(self):
if self.is_not_dryrun("EnableEbsEncryptionByDefault"): self.error_on_dryrun()
self.ec2_backend.enable_ebs_encryption_by_default()
template = self.response_template( self.ec2_backend.enable_ebs_encryption_by_default()
ENABLED_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE template = self.response_template(ENABLED_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE)
) return template.render(ebsEncryptionByDefault=True).replace("True", "true")
return template.render(ebsEncryptionByDefault=True).replace("True", "true")
def get_ebs_encryption_by_default(self): def get_ebs_encryption_by_default(self):
if self.is_not_dryrun("GetEbsEncryptionByDefault"): self.error_on_dryrun()
result = self.ec2_backend.get_ebs_encryption_by_default()
template = self.response_template(GET_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE) result = self.ec2_backend.get_ebs_encryption_by_default()
return ( template = self.response_template(GET_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE)
template.render(ebsEncryptionByDefault=result) return (
.replace("False", "false") template.render(ebsEncryptionByDefault=result)
.replace("True", "true") .replace("False", "false")
) .replace("True", "true")
)
DISABLE_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE = """<DisableEbsEncryptionByDefaultResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/"> DISABLE_EBS_ENCRYPTION_BY_DEFAULT_RESPONSE = """<DisableEbsEncryptionByDefaultResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">

View File

@ -4,22 +4,26 @@ from ._base_response import EC2BaseResponse
class SpotInstances(EC2BaseResponse): class SpotInstances(EC2BaseResponse):
def cancel_spot_instance_requests(self): def cancel_spot_instance_requests(self):
request_ids = self._get_multi_param("SpotInstanceRequestId") request_ids = self._get_multi_param("SpotInstanceRequestId")
if self.is_not_dryrun("CancelSpotInstance"):
requests = self.ec2_backend.cancel_spot_instance_requests(request_ids) self.error_on_dryrun()
template = self.response_template(CANCEL_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests) requests = self.ec2_backend.cancel_spot_instance_requests(request_ids)
template = self.response_template(CANCEL_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests)
def create_spot_datafeed_subscription(self): def create_spot_datafeed_subscription(self):
if self.is_not_dryrun("CreateSpotDatafeedSubscription"): self.error_on_dryrun()
raise NotImplementedError(
"SpotInstances.create_spot_datafeed_subscription is not yet implemented" raise NotImplementedError(
) "SpotInstances.create_spot_datafeed_subscription is not yet implemented"
)
def delete_spot_datafeed_subscription(self): def delete_spot_datafeed_subscription(self):
if self.is_not_dryrun("DeleteSpotDatafeedSubscription"): self.error_on_dryrun()
raise NotImplementedError(
"SpotInstances.delete_spot_datafeed_subscription is not yet implemented" raise NotImplementedError(
) "SpotInstances.delete_spot_datafeed_subscription is not yet implemented"
)
def describe_spot_datafeed_subscription(self): def describe_spot_datafeed_subscription(self):
raise NotImplementedError( raise NotImplementedError(
@ -67,31 +71,32 @@ class SpotInstances(EC2BaseResponse):
) )
tags = self._parse_tag_specification() tags = self._parse_tag_specification()
if self.is_not_dryrun("RequestSpotInstance"): self.error_on_dryrun()
requests = self.ec2_backend.request_spot_instances(
price=price,
image_id=image_id,
count=count,
spot_instance_type=spot_instance_type,
valid_from=valid_from,
valid_until=valid_until,
launch_group=launch_group,
availability_zone_group=availability_zone_group,
key_name=key_name,
security_groups=security_groups,
user_data=user_data,
instance_type=instance_type,
placement=placement,
kernel_id=kernel_id,
ramdisk_id=ramdisk_id,
monitoring_enabled=monitoring_enabled,
subnet_id=subnet_id,
instance_interruption_behaviour=instance_interruption_behaviour,
tags=tags,
)
template = self.response_template(REQUEST_SPOT_INSTANCES_TEMPLATE) requests = self.ec2_backend.request_spot_instances(
return template.render(requests=requests) price=price,
image_id=image_id,
count=count,
spot_instance_type=spot_instance_type,
valid_from=valid_from,
valid_until=valid_until,
launch_group=launch_group,
availability_zone_group=availability_zone_group,
key_name=key_name,
security_groups=security_groups,
user_data=user_data,
instance_type=instance_type,
placement=placement,
kernel_id=kernel_id,
ramdisk_id=ramdisk_id,
monitoring_enabled=monitoring_enabled,
subnet_id=subnet_id,
instance_interruption_behaviour=instance_interruption_behaviour,
tags=tags,
)
template = self.response_template(REQUEST_SPOT_INSTANCES_TEMPLATE)
return template.render(requests=requests)
REQUEST_SPOT_INSTANCES_TEMPLATE = """<RequestSpotInstancesResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/"> REQUEST_SPOT_INSTANCES_TEMPLATE = """<RequestSpotInstancesResponse xmlns="http://ec2.amazonaws.com/doc/2013-10-15/">

View File

@ -9,20 +9,27 @@ class TagResponse(EC2BaseResponse):
validate_resource_ids(resource_ids) validate_resource_ids(resource_ids)
self.ec2_backend.do_resources_exist(resource_ids) self.ec2_backend.do_resources_exist(resource_ids)
tags = tags_from_query_string(self.querystring) tags = tags_from_query_string(self.querystring)
if self.is_not_dryrun("CreateTags"):
self.ec2_backend.create_tags(resource_ids, tags) self.error_on_dryrun()
return CREATE_RESPONSE
self.ec2_backend.create_tags(resource_ids, tags)
return CREATE_RESPONSE
def delete_tags(self): def delete_tags(self):
resource_ids = self._get_multi_param("ResourceId") resource_ids = self._get_multi_param("ResourceId")
validate_resource_ids(resource_ids) validate_resource_ids(resource_ids)
tags = tags_from_query_string(self.querystring) tags = tags_from_query_string(self.querystring)
if self.is_not_dryrun("DeleteTags"):
self.ec2_backend.delete_tags(resource_ids, tags) self.error_on_dryrun()
return DELETE_RESPONSE
self.ec2_backend.delete_tags(resource_ids, tags)
return DELETE_RESPONSE
def describe_tags(self): def describe_tags(self):
filters = self._filters_from_querystring() filters = self._filters_from_querystring()
self.error_on_dryrun()
tags = self.ec2_backend.describe_tags(filters=filters) tags = self.ec2_backend.describe_tags(filters=filters)
template = self.response_template(DESCRIBE_RESPONSE) template = self.response_template(DESCRIBE_RESPONSE)
return template.render(tags=tags) return template.render(tags=tags)

View File

@ -90,10 +90,10 @@ class ECRResponse(BaseResponse):
) )
def batch_check_layer_availability(self): def batch_check_layer_availability(self):
if self.is_not_dryrun("BatchCheckLayerAvailability"): self.error_on_dryrun()
raise NotImplementedError( raise NotImplementedError(
"ECR.batch_check_layer_availability is not yet implemented" "ECR.batch_check_layer_availability is not yet implemented"
) )
def batch_delete_image(self): def batch_delete_image(self):
repository_str = self._get_param("repositoryName") repository_str = self._get_param("repositoryName")
@ -116,10 +116,8 @@ class ECRResponse(BaseResponse):
return json.dumps(response) return json.dumps(response)
def complete_layer_upload(self): def complete_layer_upload(self):
if self.is_not_dryrun("CompleteLayerUpload"): self.error_on_dryrun()
raise NotImplementedError( raise NotImplementedError("ECR.complete_layer_upload is not yet implemented")
"ECR.complete_layer_upload is not yet implemented"
)
def delete_repository_policy(self): def delete_repository_policy(self):
registry_id = self._get_param("registryId") registry_id = self._get_param("registryId")
@ -149,10 +147,10 @@ class ECRResponse(BaseResponse):
return json.dumps({"authorizationData": auth_data}) return json.dumps({"authorizationData": auth_data})
def get_download_url_for_layer(self): def get_download_url_for_layer(self):
if self.is_not_dryrun("GetDownloadUrlForLayer"): self.error_on_dryrun()
raise NotImplementedError( raise NotImplementedError(
"ECR.get_download_url_for_layer is not yet implemented" "ECR.get_download_url_for_layer is not yet implemented"
) )
def get_repository_policy(self): def get_repository_policy(self):
registry_id = self._get_param("registryId") registry_id = self._get_param("registryId")
@ -165,10 +163,8 @@ class ECRResponse(BaseResponse):
) )
def initiate_layer_upload(self): def initiate_layer_upload(self):
if self.is_not_dryrun("InitiateLayerUpload"): self.error_on_dryrun()
raise NotImplementedError( raise NotImplementedError("ECR.initiate_layer_upload is not yet implemented")
"ECR.initiate_layer_upload is not yet implemented"
)
def set_repository_policy(self): def set_repository_policy(self):
registry_id = self._get_param("registryId") registry_id = self._get_param("registryId")
@ -187,8 +183,8 @@ class ECRResponse(BaseResponse):
) )
def upload_layer_part(self): def upload_layer_part(self):
if self.is_not_dryrun("UploadLayerPart"): self.error_on_dryrun()
raise NotImplementedError("ECR.upload_layer_part is not yet implemented") raise NotImplementedError("ECR.upload_layer_part is not yet implemented")
def list_tags_for_resource(self): def list_tags_for_resource(self):
arn = self._get_param("resourceArn") arn = self._get_param("resourceArn")

View File

@ -155,7 +155,7 @@ def test_eip_associate_classic():
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the DisAssociateAddress operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the DisassociateAddress operation: Request would have succeeded, but DryRun flag is set"
) )
client.disassociate_address(PublicIp=eip.public_ip) client.disassociate_address(PublicIp=eip.public_ip)

View File

@ -182,7 +182,7 @@ def test_elastic_network_interfaces_modify_attribute():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the ModifyNetworkInterface operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the ModifyNetworkInterfaceAttribute operation: Request would have succeeded, but DryRun flag is set"
) )
client.modify_network_interface_attribute( client.modify_network_interface_attribute(

View File

@ -43,7 +43,7 @@ def test_instance_launch_and_terminate():
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the RunInstance operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the RunInstances operation: Request would have succeeded, but DryRun flag is set"
) )
reservation = client.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1) reservation = client.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
@ -85,7 +85,7 @@ def test_instance_launch_and_terminate():
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the TerminateInstance operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the TerminateInstances operation: Request would have succeeded, but DryRun flag is set"
) )
response = client.terminate_instances(InstanceIds=[instance_id]) response = client.terminate_instances(InstanceIds=[instance_id])
@ -920,7 +920,7 @@ def test_instance_start_and_stop():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the StopInstance operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the StopInstances operation: Request would have succeeded, but DryRun flag is set"
) )
stopped_instances = client.stop_instances(InstanceIds=instance_ids)[ stopped_instances = client.stop_instances(InstanceIds=instance_ids)[
@ -939,7 +939,7 @@ def test_instance_start_and_stop():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the StartInstance operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the StartInstances operation: Request would have succeeded, but DryRun flag is set"
) )
instance1.reload() instance1.reload()
@ -967,7 +967,7 @@ def test_instance_reboot():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the RebootInstance operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the RebootInstances operation: Request would have succeeded, but DryRun flag is set"
) )
instance.state.should.equal({"Code": 16, "Name": "running"}) instance.state.should.equal({"Code": 16, "Name": "running"})
@ -988,7 +988,7 @@ def test_instance_attribute_instance_type():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the ModifyInstanceType operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the ModifyInstanceAttribute operation: Request would have succeeded, but DryRun flag is set"
) )
instance.modify_attribute(InstanceType={"Value": "m1.medium"}) instance.modify_attribute(InstanceType={"Value": "m1.medium"})
@ -1019,7 +1019,7 @@ def test_modify_instance_attribute_security_groups():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the ModifyInstanceSecurityGroups operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the ModifyInstanceAttribute operation: Request would have succeeded, but DryRun flag is set"
) )
instance.modify_attribute(Groups=[sg_id, sg_id2]) instance.modify_attribute(Groups=[sg_id, sg_id2])
@ -1043,7 +1043,7 @@ def test_instance_attribute_user_data():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the ModifyUserData operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the ModifyInstanceAttribute operation: Request would have succeeded, but DryRun flag is set"
) )
instance.modify_attribute(UserData={"Value": "this is my user data"}) instance.modify_attribute(UserData={"Value": "this is my user data"})
@ -1068,7 +1068,7 @@ def test_instance_attribute_source_dest_check():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the ModifySourceDestCheck operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the ModifyInstanceAttribute operation: Request would have succeeded, but DryRun flag is set"
) )
instance.modify_attribute(SourceDestCheck={"Value": False}) instance.modify_attribute(SourceDestCheck={"Value": False})
@ -1971,7 +1971,7 @@ def test_get_instance_by_security_group():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the ModifyInstanceSecurityGroups operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the ModifyInstanceAttribute operation: Request would have succeeded, but DryRun flag is set"
) )
client.modify_instance_attribute(InstanceId=instance.id, Groups=[security_group.id]) client.modify_instance_attribute(InstanceId=instance.id, Groups=[security_group.id])

View File

@ -50,6 +50,28 @@ def test_launch_template_create():
) )
@mock_ec2
def test_create_launch_template__dryrun():
cli = boto3.client("ec2", region_name="us-east-1")
template_name = str(uuid4())
with pytest.raises(ClientError) as exc:
cli.create_launch_template(
DryRun=True,
LaunchTemplateName=template_name,
LaunchTemplateData={"ImageId": "ami-abc123"},
TagSpecifications=[
{"ResourceType": "instance", "Tags": [{"Key": "key", "Value": "value"}]}
],
)
err = exc.value.response["Error"]
err.should.have.key("Code").equals("DryRunOperation")
err["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the CreateLaunchTemplate operation: Request would have succeeded, but DryRun flag is set"
)
@mock_ec2 @mock_ec2
def test_describe_launch_template_versions(): def test_describe_launch_template_versions():
template_data = { template_data = {
@ -142,6 +164,29 @@ def test_create_launch_template_version():
version["VersionNumber"].should.equal(2) version["VersionNumber"].should.equal(2)
@mock_ec2
def test_create_launch_template_version__dryrun():
cli = boto3.client("ec2", region_name="us-east-1")
template_name = str(uuid4())
cli.create_launch_template(
LaunchTemplateName=template_name, LaunchTemplateData={"ImageId": "ami-abc123"}
)
with pytest.raises(ClientError) as exc:
cli.create_launch_template_version(
DryRun=True,
LaunchTemplateName=template_name,
LaunchTemplateData={"ImageId": "ami-def456"},
VersionDescription="new ami",
)
err = exc.value.response["Error"]
err.should.have.key("Code").equals("DryRunOperation")
err["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the CreateLaunchTemplateVersion operation: Request would have succeeded, but DryRun flag is set"
)
@mock_ec2 @mock_ec2
def test_create_launch_template_version_by_id(): def test_create_launch_template_version_by_id():
cli = boto3.client("ec2", region_name="us-east-1") cli = boto3.client("ec2", region_name="us-east-1")
@ -476,6 +521,9 @@ def test_delete_launch_template__dryrun():
cli.delete_launch_template(DryRun=True, LaunchTemplateName=template_name) cli.delete_launch_template(DryRun=True, LaunchTemplateName=template_name)
err = exc.value.response["Error"] err = exc.value.response["Error"]
err.should.have.key("Code").equals("DryRunOperation") err.should.have.key("Code").equals("DryRunOperation")
err["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the DeleteLaunchTemplate operation: Request would have succeeded, but DryRun flag is set"
)
# Template still exists # Template still exists
cli.describe_launch_templates(LaunchTemplateNames=[template_name])[ cli.describe_launch_templates(LaunchTemplateNames=[template_name])[

View File

@ -224,7 +224,7 @@ def test_authorize_ip_range_and_revoke():
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the GrantSecurityGroupIngress operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the AuthorizeSecurityGroupIngress operation: Request would have succeeded, but DryRun flag is set"
) )
ingress_permissions = [ ingress_permissions = [
@ -287,7 +287,7 @@ def test_authorize_ip_range_and_revoke():
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the GrantSecurityGroupEgress operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the AuthorizeSecurityGroupEgress operation: Request would have succeeded, but DryRun flag is set"
) )
egress_security_group.authorize_egress(IpPermissions=egress_permissions) egress_security_group.authorize_egress(IpPermissions=egress_permissions)

View File

@ -57,7 +57,7 @@ def test_request_spot_instances():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the RequestSpotInstance operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the RequestSpotInstances operation: Request would have succeeded, but DryRun flag is set"
) )
request = conn.request_spot_instances( request = conn.request_spot_instances(
@ -178,7 +178,7 @@ def test_cancel_spot_instance_request():
ex.value.response["Error"]["Code"].should.equal("DryRunOperation") ex.value.response["Error"]["Code"].should.equal("DryRunOperation")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412) ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(412)
ex.value.response["Error"]["Message"].should.equal( ex.value.response["Error"]["Message"].should.equal(
"An error occurred (DryRunOperation) when calling the CancelSpotInstance operation: Request would have succeeded, but DryRun flag is set" "An error occurred (DryRunOperation) when calling the CancelSpotInstanceRequests operation: Request would have succeeded, but DryRun flag is set"
) )
client.cancel_spot_instance_requests( client.cancel_spot_instance_requests(