Techdebt: Replace string-format with f-strings (for e* dirs) (#5668)

This commit is contained in:
Bert Blommers 2022-11-14 21:34:02 -01:00 committed by GitHub
parent 6b50b8020f
commit 6ab2497a12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 274 additions and 466 deletions

View File

@ -45,7 +45,7 @@ class MissingParameterError(EC2ClientError):
def __init__(self, parameter):
super().__init__(
"MissingParameter",
"The request must contain the parameter {0}".format(parameter),
f"The request must contain the parameter {parameter}",
)
@ -53,7 +53,7 @@ class InvalidDHCPOptionsIdError(EC2ClientError):
def __init__(self, dhcp_options_id):
super().__init__(
"InvalidDhcpOptionID.NotFound",
"DhcpOptionID {0} does not exist.".format(dhcp_options_id),
f"DhcpOptionID {dhcp_options_id} does not exist.",
)
@ -71,21 +71,21 @@ class MalformedDHCPOptionsIdError(EC2ClientError):
def __init__(self, dhcp_options_id):
super().__init__(
"InvalidDhcpOptionsId.Malformed",
'Invalid id: "{0}" (expecting "dopt-...")'.format(dhcp_options_id),
f'Invalid id: "{dhcp_options_id}" (expecting "dopt-...")',
)
class InvalidKeyPairNameError(EC2ClientError):
def __init__(self, key):
super().__init__(
"InvalidKeyPair.NotFound", "The keypair '{0}' does not exist.".format(key)
"InvalidKeyPair.NotFound", f"The keypair '{key}' does not exist."
)
class InvalidKeyPairDuplicateError(EC2ClientError):
def __init__(self, key):
super().__init__(
"InvalidKeyPair.Duplicate", "The keypair '{0}' already exists.".format(key)
"InvalidKeyPair.Duplicate", f"The keypair '{key}' already exists."
)
@ -99,16 +99,13 @@ class InvalidKeyPairFormatError(EC2ClientError):
class InvalidVPCIdError(EC2ClientError):
def __init__(self, vpc_id):
super().__init__(
"InvalidVpcID.NotFound", "VpcID {0} does not exist.".format(vpc_id)
)
super().__init__("InvalidVpcID.NotFound", f"VpcID {vpc_id} does not exist.")
class InvalidSubnetIdError(EC2ClientError):
def __init__(self, subnet_id):
super().__init__(
"InvalidSubnetID.NotFound",
"The subnet ID '{}' does not exist".format(subnet_id),
"InvalidSubnetID.NotFound", f"The subnet ID '{subnet_id}' does not exist"
)
@ -116,9 +113,7 @@ class InvalidFlowLogIdError(EC2ClientError):
def __init__(self, count, flow_log_ids):
super().__init__(
"InvalidFlowLogId.NotFound",
"These flow log ids in the input list are not found: [TotalCount: {0}] {1}".format(
count, flow_log_ids
),
f"These flow log ids in the input list are not found: [TotalCount: {count}] {flow_log_ids}",
)
@ -134,7 +129,7 @@ class InvalidNetworkAclIdError(EC2ClientError):
def __init__(self, network_acl_id):
super().__init__(
"InvalidNetworkAclID.NotFound",
"The network acl ID '{0}' does not exist".format(network_acl_id),
f"The network acl ID '{network_acl_id}' does not exist",
)
@ -142,7 +137,7 @@ class InvalidVpnGatewayIdError(EC2ClientError):
def __init__(self, vpn_gw):
super().__init__(
"InvalidVpnGatewayID.NotFound",
"The virtual private gateway ID '{0}' does not exist".format(vpn_gw),
f"The virtual private gateway ID '{vpn_gw}' does not exist",
)
@ -150,9 +145,7 @@ class InvalidVpnGatewayAttachmentError(EC2ClientError):
def __init__(self, vpn_gw, vpc_id):
super().__init__(
"InvalidVpnGatewayAttachment.NotFound",
"The attachment with vpn gateway ID '{}' and vpc ID '{}' does not exist".format(
vpn_gw, vpc_id
),
f"The attachment with vpn gateway ID '{vpn_gw}' and vpc ID '{vpc_id}' does not exist",
)
@ -160,7 +153,7 @@ class InvalidVpnConnectionIdError(EC2ClientError):
def __init__(self, network_acl_id):
super().__init__(
"InvalidVpnConnectionID.NotFound",
"The vpnConnection ID '{0}' does not exist".format(network_acl_id),
f"The vpnConnection ID '{network_acl_id}' does not exist",
)
@ -168,7 +161,7 @@ class InvalidCustomerGatewayIdError(EC2ClientError):
def __init__(self, customer_gateway_id):
super().__init__(
"InvalidCustomerGatewayID.NotFound",
"The customer gateway ID '{0}' does not exist".format(customer_gateway_id),
f"The customer gateway ID '{customer_gateway_id}' does not exist",
)
@ -176,7 +169,7 @@ class InvalidNetworkInterfaceIdError(EC2ClientError):
def __init__(self, eni_id):
super().__init__(
"InvalidNetworkInterfaceID.NotFound",
"The network interface ID '{0}' does not exist".format(eni_id),
f"The network interface ID '{eni_id}' does not exist",
)
@ -184,17 +177,14 @@ class InvalidNetworkAttachmentIdError(EC2ClientError):
def __init__(self, attachment_id):
super().__init__(
"InvalidAttachmentID.NotFound",
"The network interface attachment ID '{0}' does not exist".format(
attachment_id
),
f"The network interface attachment ID '{attachment_id}' does not exist",
)
class InvalidSecurityGroupDuplicateError(EC2ClientError):
def __init__(self, name):
super().__init__(
"InvalidGroup.Duplicate",
"The security group '{0}' already exists".format(name),
"InvalidGroup.Duplicate", f"The security group '{name}' already exists"
)
@ -202,7 +192,7 @@ class InvalidSecurityGroupNotFoundError(EC2ClientError):
def __init__(self, name):
super().__init__(
"InvalidGroup.NotFound",
"The security group '{0}' does not exist".format(name),
f"The security group '{name}' does not exist",
)
@ -225,7 +215,7 @@ class InvalidRouteTableIdError(EC2ClientError):
def __init__(self, route_table_id):
super().__init__(
"InvalidRouteTableID.NotFound",
"The routeTable ID '{0}' does not exist".format(route_table_id),
f"The routeTable ID '{route_table_id}' does not exist",
)
@ -233,17 +223,14 @@ class InvalidRouteError(EC2ClientError):
def __init__(self, route_table_id, cidr):
super().__init__(
"InvalidRoute.NotFound",
"no route with destination-cidr-block {0} in route table {1}".format(
cidr, route_table_id
),
f"no route with destination-cidr-block {cidr} in route table {route_table_id}",
)
class RouteAlreadyExistsError(EC2ClientError):
def __init__(self, cidr):
super().__init__(
"RouteAlreadyExists",
"The route identified by {0} already exists".format(cidr),
"RouteAlreadyExists", f"The route identified by {cidr} already exists"
)
@ -274,7 +261,7 @@ class InvalidAMIIdError(EC2ClientError):
def __init__(self, ami_id):
super().__init__(
"InvalidAMIID.NotFound",
"The image id '[{0}]' does not exist".format(ami_id),
f"The image id '[{ami_id}]' does not exist",
)
@ -282,7 +269,7 @@ class UnvailableAMIIdError(EC2ClientError):
def __init__(self, ami_id):
super().__init__(
"InvalidAMIID.Unavailable",
"The image id '[{0}]' is no longer available".format(ami_id),
f"The image id '[{ami_id}]' is no longer available",
)
@ -290,17 +277,14 @@ class InvalidAMIAttributeItemValueError(EC2ClientError):
def __init__(self, attribute, value):
super().__init__(
"InvalidAMIAttributeItemValue",
'Invalid attribute item value "{0}" for {1} item type.'.format(
value, attribute
),
f'Invalid attribute item value "{value}" for {attribute} item type.',
)
class MalformedAMIIdError(EC2ClientError):
def __init__(self, ami_id):
super().__init__(
"InvalidAMIID.Malformed",
'Invalid id: "{0}" (expecting "ami-...")'.format(ami_id),
"InvalidAMIID.Malformed", f'Invalid id: "{ami_id}" (expecting "ami-...")'
)
@ -321,8 +305,7 @@ class InvalidSnapshotInUse(EC2ClientError):
class InvalidVolumeIdError(EC2ClientError):
def __init__(self, volume_id):
super().__init__(
"InvalidVolume.NotFound",
"The volume '{0}' does not exist.".format(volume_id),
"InvalidVolume.NotFound", f"The volume '{volume_id}' does not exist."
)
@ -330,9 +313,7 @@ class InvalidVolumeAttachmentError(EC2ClientError):
def __init__(self, volume_id, instance_id):
super().__init__(
"InvalidAttachment.NotFound",
"Volume {0} can not be detached from {1} because it is not attached".format(
volume_id, instance_id
),
f"Volume {volume_id} can not be detached from {instance_id} because it is not attached",
)
@ -340,9 +321,7 @@ class InvalidVolumeDetachmentError(EC2ClientError):
def __init__(self, volume_id, instance_id, device):
super().__init__(
"InvalidAttachment.NotFound",
"The volume {0} is not attached to instance {1} as device {2}".format(
volume_id, instance_id, device
),
f"The volume {volume_id} is not attached to instance {instance_id} as device {device}",
)
@ -350,29 +329,27 @@ class VolumeInUseError(EC2ClientError):
def __init__(self, volume_id, instance_id):
super().__init__(
"VolumeInUse",
"Volume {0} is currently attached to {1}".format(volume_id, instance_id),
f"Volume {volume_id} is currently attached to {instance_id}",
)
class InvalidDomainError(EC2ClientError):
def __init__(self, domain):
super().__init__(
"InvalidParameterValue", "Invalid value '{0}' for domain.".format(domain)
"InvalidParameterValue", f"Invalid value '{domain}' for domain."
)
class InvalidAddressError(EC2ClientError):
def __init__(self, ip):
super().__init__(
"InvalidAddress.NotFound", "Address '{0}' not found.".format(ip)
)
super().__init__("InvalidAddress.NotFound", f"Address '{ip}' not found.")
class LogDestinationNotFoundError(EC2ClientError):
def __init__(self, bucket_name):
super().__init__(
"LogDestinationNotFoundException",
"LogDestination: '{0}' does not exist.".format(bucket_name),
f"LogDestination: '{bucket_name}' does not exist.",
)
@ -380,7 +357,7 @@ class InvalidAllocationIdError(EC2ClientError):
def __init__(self, allocation_id):
super().__init__(
"InvalidAllocationID.NotFound",
"Allocation ID '{0}' not found.".format(allocation_id),
f"Allocation ID '{allocation_id}' not found.",
)
@ -388,7 +365,7 @@ class InvalidAssociationIdError(EC2ClientError):
def __init__(self, association_id):
super().__init__(
"InvalidAssociationID.NotFound",
"Association ID '{0}' not found.".format(association_id),
f"Association ID '{association_id}' not found.",
)
@ -396,9 +373,7 @@ class InvalidVpcCidrBlockAssociationIdError(EC2ClientError):
def __init__(self, association_id):
super().__init__(
"InvalidVpcCidrBlockAssociationIdError.NotFound",
"The vpc CIDR block association ID '{0}' does not exist".format(
association_id
),
f"The vpc CIDR block association ID '{association_id}' does not exist",
)
@ -406,9 +381,7 @@ class InvalidVPCPeeringConnectionIdError(EC2ClientError):
def __init__(self, vpc_peering_connection_id):
super().__init__(
"InvalidVpcPeeringConnectionId.NotFound",
"VpcPeeringConnectionID {0} does not exist.".format(
vpc_peering_connection_id
),
f"VpcPeeringConnectionID {vpc_peering_connection_id} does not exist.",
)
@ -416,9 +389,7 @@ class InvalidVPCPeeringConnectionStateTransitionError(EC2ClientError):
def __init__(self, vpc_peering_connection_id):
super().__init__(
"InvalidStateTransition",
"VpcPeeringConnectionID {0} is not in the correct state for the request.".format(
vpc_peering_connection_id
),
f"VpcPeeringConnectionID {vpc_peering_connection_id} is not in the correct state for the request.",
)
@ -444,9 +415,7 @@ class InvalidDependantParameterError(EC2ClientError):
def __init__(self, dependant_parameter, parameter, parameter_value):
super().__init__(
"InvalidParameter",
"{0} can't be empty if {1} is {2}.".format(
dependant_parameter, parameter, parameter_value
),
f"{dependant_parameter} can't be empty if {parameter} is {parameter_value}.",
)
@ -454,22 +423,20 @@ class InvalidDependantParameterTypeError(EC2ClientError):
def __init__(self, dependant_parameter, parameter_value, parameter):
super().__init__(
"InvalidParameter",
"{0} type must be {1} if {2} is provided.".format(
dependant_parameter, parameter_value, parameter
),
f"{dependant_parameter} type must be {parameter_value} if {parameter} is provided.",
)
class InvalidAggregationIntervalParameterError(EC2ClientError):
def __init__(self, parameter):
super().__init__("InvalidParameter", "Invalid {0}".format(parameter))
super().__init__("InvalidParameter", f"Invalid {parameter}")
class InvalidParameterValueError(EC2ClientError):
def __init__(self, parameter_value):
super().__init__(
"InvalidParameterValue",
"Value {0} is invalid for parameter.".format(parameter_value),
f"Value {parameter_value} is invalid for parameter.",
)
@ -492,17 +459,14 @@ class InvalidParameterValueErrorUnknownAttribute(EC2ClientError):
def __init__(self, parameter_value):
super().__init__(
"InvalidParameterValue",
"Value ({0}) for parameter attribute is invalid. Unknown attribute.".format(
parameter_value
),
f"Value ({parameter_value}) for parameter attribute is invalid. Unknown attribute.",
)
class InvalidGatewayIDError(EC2ClientError):
def __init__(self, gateway_id):
super().__init__(
"InvalidGatewayID.NotFound",
"The eigw ID '{0}' does not exist".format(gateway_id),
"InvalidGatewayID.NotFound", f"The eigw ID '{gateway_id}' does not exist"
)
@ -510,7 +474,7 @@ class InvalidInternetGatewayIdError(EC2ClientError):
def __init__(self, internet_gateway_id):
super().__init__(
"InvalidInternetGatewayID.NotFound",
"InternetGatewayID {0} does not exist.".format(internet_gateway_id),
f"InternetGatewayID {internet_gateway_id} does not exist.",
)
@ -518,9 +482,7 @@ class GatewayNotAttachedError(EC2ClientError):
def __init__(self, internet_gateway_id, vpc_id):
super().__init__(
"Gateway.NotAttached",
"InternetGatewayID {0} is not attached to a VPC {1}.".format(
internet_gateway_id, vpc_id
),
f"InternetGatewayID {internet_gateway_id} is not attached to a VPC {vpc_id}.",
)
@ -528,7 +490,7 @@ class ResourceAlreadyAssociatedError(EC2ClientError):
def __init__(self, resource_id):
super().__init__(
"Resource.AlreadyAssociated",
"Resource {0} is already associated.".format(resource_id),
f"Resource {resource_id} is already associated.",
)
@ -542,14 +504,13 @@ class TagLimitExceeded(EC2ClientError):
class InvalidID(EC2ClientError):
def __init__(self, resource_id):
super().__init__("InvalidID", "The ID '{0}' is not valid".format(resource_id))
super().__init__("InvalidID", f"The ID '{resource_id}' is not valid")
class InvalidCIDRSubnetError(EC2ClientError):
def __init__(self, cidr):
super().__init__(
"InvalidParameterValue",
"invalid CIDR subnet specification: {0}".format(cidr),
"InvalidParameterValue", f"invalid CIDR subnet specification: {cidr}"
)
@ -564,32 +525,29 @@ class RulesPerSecurityGroupLimitExceededError(EC2ClientError):
class MotoNotImplementedError(NotImplementedError):
def __init__(self, blurb):
super().__init__(
"{0} has not been implemented in Moto yet."
f"{blurb} has not been implemented in Moto yet."
" Feel free to open an issue at"
" https://github.com/spulec/moto/issues".format(blurb)
" https://github.com/spulec/moto/issues"
)
class FilterNotImplementedError(MotoNotImplementedError):
def __init__(self, filter_name, method_name):
super().__init__("The filter '{0}' for {1}".format(filter_name, method_name))
super().__init__(f"The filter '{filter_name}' for {method_name}")
class CidrLimitExceeded(EC2ClientError):
def __init__(self, vpc_id, max_cidr_limit):
super().__init__(
"CidrLimitExceeded",
"This network '{0}' has met its maximum number of allowed CIDRs: {1}".format(
vpc_id, max_cidr_limit
),
f"This network '{vpc_id}' has met its maximum number of allowed CIDRs: {max_cidr_limit}",
)
class UnsupportedTenancy(EC2ClientError):
def __init__(self, tenancy):
super().__init__(
"UnsupportedTenancy",
"The tenancy value {0} is not supported.".format(tenancy),
"UnsupportedTenancy", f"The tenancy value {tenancy} is not supported."
)
@ -597,8 +555,7 @@ class OperationNotPermitted(EC2ClientError):
def __init__(self, association_id):
super().__init__(
"OperationNotPermitted",
"The vpc CIDR block with association ID {} may not be disassociated. "
"It is the primary IPv4 CIDR block of the VPC".format(association_id),
f"The vpc CIDR block with association ID {association_id} may not be disassociated. It is the primary IPv4 CIDR block of the VPC",
)
@ -606,10 +563,8 @@ class InvalidAvailabilityZoneError(EC2ClientError):
def __init__(self, availability_zone_value, valid_availability_zones):
super().__init__(
"InvalidParameterValue",
"Value ({0}) for parameter availabilityZone is invalid. "
"Subnets can currently only be created in the following availability zones: {1}.".format(
availability_zone_value, valid_availability_zones
),
f"Value ({availability_zone_value}) for parameter availabilityZone is invalid. "
f"Subnets can currently only be created in the following availability zones: {valid_availability_zones}.",
)
@ -617,7 +572,7 @@ class AvailabilityZoneNotFromRegionError(EC2ClientError):
def __init__(self, availability_zone_value):
super().__init__(
"InvalidParameterValue",
"Invalid Availability Zone ({0})".format(availability_zone_value),
f"Invalid Availability Zone ({availability_zone_value})",
)
@ -625,26 +580,20 @@ class NetworkAclEntryAlreadyExistsError(EC2ClientError):
def __init__(self, rule_number):
super().__init__(
"NetworkAclEntryAlreadyExists",
"The network acl entry identified by {} already exists.".format(
rule_number
),
f"The network acl entry identified by {rule_number} already exists.",
)
class InvalidSubnetRangeError(EC2ClientError):
def __init__(self, cidr_block):
super().__init__(
"InvalidSubnet.Range", "The CIDR '{}' is invalid.".format(cidr_block)
)
super().__init__("InvalidSubnet.Range", f"The CIDR '{cidr_block}' is invalid.")
class InvalidCIDRBlockParameterError(EC2ClientError):
def __init__(self, cidr_block):
super().__init__(
"InvalidParameterValue",
"Value ({}) for parameter cidrBlock is invalid. This is not a valid CIDR block.".format(
cidr_block
),
f"Value ({cidr_block}) for parameter cidrBlock is invalid. This is not a valid CIDR block.",
)
@ -652,9 +601,7 @@ class InvalidDestinationCIDRBlockParameterError(EC2ClientError):
def __init__(self, cidr_block):
super().__init__(
"InvalidParameterValue",
"Value ({}) for parameter destinationCidrBlock is invalid. This is not a valid CIDR block.".format(
cidr_block
),
f"Value ({cidr_block}) for parameter destinationCidrBlock is invalid. This is not a valid CIDR block.",
)
@ -662,15 +609,13 @@ class InvalidSubnetConflictError(EC2ClientError):
def __init__(self, cidr_block):
super().__init__(
"InvalidSubnet.Conflict",
"The CIDR '{}' conflicts with another subnet".format(cidr_block),
f"The CIDR '{cidr_block}' conflicts with another subnet",
)
class InvalidVPCRangeError(EC2ClientError):
def __init__(self, cidr_block):
super().__init__(
"InvalidVpc.Range", "The CIDR '{}' is invalid.".format(cidr_block)
)
super().__init__("InvalidVpc.Range", f"The CIDR '{cidr_block}' is invalid.")
# accept exception
@ -678,10 +623,7 @@ class OperationNotPermitted2(EC2ClientError):
def __init__(self, client_region, pcx_id, acceptor_region):
super().__init__(
"OperationNotPermitted",
"Incorrect region ({0}) specified for this request."
"VPC peering connection {1} must be accepted in region {2}".format(
client_region, pcx_id, acceptor_region
),
f"Incorrect region ({client_region}) specified for this request.VPC peering connection {pcx_id} must be accepted in region {acceptor_region}",
)
@ -690,10 +632,7 @@ class OperationNotPermitted3(EC2ClientError):
def __init__(self, client_region, pcx_id, acceptor_region):
super().__init__(
"OperationNotPermitted",
"Incorrect region ({0}) specified for this request."
"VPC peering connection {1} must be accepted or rejected in region {2}".format(
client_region, pcx_id, acceptor_region
),
f"Incorrect region ({client_region}) specified for this request.VPC peering connection {pcx_id} must be accepted or rejected in region {acceptor_region}",
)
@ -701,8 +640,7 @@ class OperationNotPermitted4(EC2ClientError):
def __init__(self, instance_id):
super().__init__(
"OperationNotPermitted",
"The instance '{0}' may not be terminated. Modify its 'disableApiTermination' "
"instance attribute and try again.".format(instance_id),
f"The instance '{instance_id}' may not be terminated. Modify its 'disableApiTermination' instance attribute and try again.",
)
@ -734,9 +672,7 @@ class InvalidParameterDependency(EC2ClientError):
def __init__(self, param, param_needed):
super().__init__(
"InvalidParameterDependency",
"The parameter [{0}] requires the parameter {1} to be set.".format(
param, param_needed
),
f"The parameter [{param}] requires the parameter {param_needed} to be set.",
)
@ -744,7 +680,7 @@ class IncorrectStateIamProfileAssociationError(EC2ClientError):
def __init__(self, instance_id):
super().__init__(
"IncorrectState",
"There is an existing association for instance {0}".format(instance_id),
f"There is an existing association for instance {instance_id}",
)
@ -752,7 +688,7 @@ class InvalidAssociationIDIamProfileAssociationError(EC2ClientError):
def __init__(self, association_id):
super().__init__(
"InvalidAssociationID.NotFound",
"An invalid association-id of '{0}' was given".format(association_id),
f"An invalid association-id of '{association_id}' was given",
)
@ -760,7 +696,7 @@ class InvalidVpcEndPointIdError(EC2ClientError):
def __init__(self, vpc_end_point_id):
super().__init__(
"InvalidVpcEndpointId.NotFound",
"The VpcEndPoint ID '{0}' does not exist".format(vpc_end_point_id),
f"The VpcEndPoint ID '{vpc_end_point_id}' does not exist",
)
@ -768,9 +704,7 @@ class InvalidTaggableResourceType(EC2ClientError):
def __init__(self, resource_type):
super().__init__(
"InvalidParameterValue",
"'{}' is not a valid taggable resource type for this operation.".format(
resource_type
),
f"'{resource_type}' is not a valid taggable resource type for this operation.",
)
@ -778,7 +712,7 @@ class GenericInvalidParameterValueError(EC2ClientError):
def __init__(self, attribute, value):
super().__init__(
"InvalidParameterValue",
"invalid value for parameter {0}: {1}".format(attribute, value),
f"invalid value for parameter {attribute}: {value}",
)
@ -786,9 +720,7 @@ class InvalidSubnetCidrBlockAssociationID(EC2ClientError):
def __init__(self, association_id):
super().__init__(
"InvalidSubnetCidrBlockAssociationID.NotFound",
"The subnet CIDR block with association ID '{0}' does not exist".format(
association_id
),
f"The subnet CIDR block with association ID '{association_id}' does not exist",
)
@ -796,7 +728,7 @@ class InvalidCarrierGatewayID(EC2ClientError):
def __init__(self, carrier_gateway_id):
super().__init__(
"InvalidCarrierGatewayID.NotFound",
"The CarrierGateway ID '{0}' does not exist".format(carrier_gateway_id),
f"The CarrierGateway ID '{carrier_gateway_id}' does not exist",
)

View File

@ -101,7 +101,7 @@ class Ami(TaggedEC2Resource):
# AWS auto-creates these, we should reflect the same.
volume = self.ec2_backend.create_volume(size=15, zone_name=region_name)
snapshot_description = (
snapshot_description or "Auto-created snapshot for AMI %s" % self.id
snapshot_description or f"Auto-created snapshot for AMI {self.id}"
)
self.ebs_snapshot = self.ec2_backend.create_snapshot(
volume.id, snapshot_description, self.owner_id, from_ami=ami_id

View File

@ -45,23 +45,19 @@ class RegionsAndZonesBackend:
for region in Session().get_available_regions("ec2"):
if region in regions_opt_in_not_required:
regions.append(
Region(
region, "ec2.{}.amazonaws.com".format(region), "opt-in-not-required"
)
Region(region, f"ec2.{region}.amazonaws.com", "opt-in-not-required")
)
else:
regions.append(
Region(region, "ec2.{}.amazonaws.com".format(region), "not-opted-in")
Region(region, f"ec2.{region}.amazonaws.com", "not-opted-in")
)
for region in Session().get_available_regions("ec2", partition_name="aws-us-gov"):
regions.append(
Region(region, "ec2.{}.amazonaws.com".format(region), "opt-in-not-required")
Region(region, f"ec2.{region}.amazonaws.com", "opt-in-not-required")
)
for region in Session().get_available_regions("ec2", partition_name="aws-cn"):
regions.append(
Region(
region, "ec2.{}.amazonaws.com.cn".format(region), "opt-in-not-required"
)
Region(region, f"ec2.{region}.amazonaws.com.cn", "opt-in-not-required")
)
zones = {

View File

@ -303,7 +303,7 @@ class NetworkInterfaceBackend:
]
else:
self.raise_not_implemented_error(
"The filter '{0}' for DescribeNetworkInterfaces".format(_filter)
f"The filter '{_filter}' for DescribeNetworkInterfaces"
)
return enis

View File

@ -225,11 +225,7 @@ class FlowLogsBackend:
# the unsuccessful status for the
# given resource_id
unsuccessful.append(
(
resource_id,
"400",
"LogDestination: {0} does not exist.".format(arn),
)
(resource_id, "400", f"LogDestination: {arn} does not exist.")
)
continue
elif log_destination_type == "cloud-watch-logs":

View File

@ -117,7 +117,7 @@ class InstanceType(dict):
self[name] = value
def __repr__(self):
return "<InstanceType: %s>" % self.name
return f"<InstanceType: {self.name}>"
def get_filter_value(self, filter_name):
def stringify(v):

View File

@ -127,11 +127,11 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
ami = amis[0] if amis else None
if ami is None:
warnings.warn(
"Could not find AMI with image-id:{0}, "
f"Could not find AMI with image-id:{self.image_id}, "
"in the near future this will "
"cause an error.\n"
"Use ec2_backend.describe_images() to "
"find suitable image for your test".format(self.image_id),
"find suitable image for your test",
PendingDeprecationWarning,
)
@ -238,9 +238,9 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
def private_dns(self):
formatted_ip = self.private_ip.replace(".", "-")
if self.region_name == "us-east-1":
return "ip-{0}.ec2.internal".format(formatted_ip)
return f"ip-{formatted_ip}.ec2.internal"
else:
return "ip-{0}.{1}.compute.internal".format(formatted_ip, self.region_name)
return f"ip-{formatted_ip}.{self.region_name}.compute.internal"
@property
def public_ip(self):
@ -251,11 +251,9 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
if self.public_ip:
formatted_ip = self.public_ip.replace(".", "-")
if self.region_name == "us-east-1":
return "ec2-{0}.compute-1.amazonaws.com".format(formatted_ip)
return f"ec2-{formatted_ip}.compute-1.amazonaws.com"
else:
return "ec2-{0}.{1}.compute.amazonaws.com".format(
formatted_ip, self.region_name
)
return f"ec2-{formatted_ip}.{self.region_name}.compute.amazonaws.com"
@staticmethod
def cloudformation_name_type():
@ -358,8 +356,8 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self._state.name = "stopped"
self._state.code = 80
self._reason = "User initiated ({0})".format(
datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
self._reason = (
f"User initiated ({datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')})"
)
self._state_reason = StateReason(
"Client.UserInitiatedShutdown: User initiated shutdown",
@ -408,8 +406,8 @@ class Instance(TaggedEC2Resource, BotoInstance, CloudFormationModel):
self._state.name = "terminated"
self._state.code = 48
self._reason = "User initiated ({0})".format(
datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
self._reason = (
f"User initiated ({datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')})"
)
self._state_reason = StateReason(
"Client.UserInitiatedShutdown: User initiated shutdown",
@ -600,7 +598,7 @@ class InstanceBackend:
count: int,
user_data: Optional[str],
security_group_names: List[str],
**kwargs: Any
**kwargs: Any,
) -> Reservation:
location_type = "availability-zone" if kwargs.get("placement") else "region"
default_region = "us-east-1"

View File

@ -141,7 +141,7 @@ class InternetGatewayBackend:
igw = self.get_internet_gateway(internet_gateway_id)
if igw.vpc:
raise DependencyViolationError(
"{0} is being utilized by {1}".format(internet_gateway_id, igw.vpc.id)
f"{internet_gateway_id} is being utilized by {igw.vpc.id}"
)
self.internet_gateways.pop(internet_gateway_id)
return True

View File

@ -30,9 +30,7 @@ class ManagedPrefixList(TaggedEC2Resource):
self.delete_counter = 1
def arn(self, region, owner_id):
return "arn:aws:ec2:{region}:{owner_id}:prefix-list/{resource_id}".format(
region=region, resource_id=self.id, owner_id=owner_id
)
return f"arn:aws:ec2:{region}:{owner_id}:prefix-list/{self.id}"
@property
def owner_id(self):
@ -142,7 +140,7 @@ class ManagedPrefixListBackend:
managed_prefix_list = self.create_managed_prefix_list(
address_family="IPv4",
entry=entry,
prefix_list_name="com.amazonaws.{}.s3".format(self.region_name),
prefix_list_name=f"com.amazonaws.{self.region_name}.s3",
owner_id="aws",
)
managed_prefix_list.version = None
@ -159,7 +157,7 @@ class ManagedPrefixListBackend:
managed_prefix_list = self.create_managed_prefix_list(
address_family="IPv4",
entry=entry,
prefix_list_name="com.amazonaws.{}.dynamodb".format(self.region_name),
prefix_list_name=f"com.amazonaws.{self.region_name}.dynamodb",
owner_id="aws",
)
managed_prefix_list.version = None

View File

@ -162,9 +162,7 @@ class RouteTableBackend:
route_table = self.get_route_table(route_table_id)
if route_table.associations:
raise DependencyViolationError(
"The routeTable '{0}' has dependencies and cannot be deleted.".format(
route_table_id
)
f"The routeTable '{route_table_id}' has dependencies and cannot be deleted."
)
self.route_tables.pop(route_table_id)
return True

View File

@ -523,9 +523,7 @@ class SecurityGroupBackend:
def _delete_security_group(self, vpc_id, group_id):
vpc_id = vpc_id or self.default_vpc.id
if self.groups[vpc_id][group_id].enis:
raise DependencyViolationError(
"{0} is being utilized by {1}".format(group_id, "ENIs")
)
raise DependencyViolationError(f"{group_id} is being utilized by ENIs")
return self.groups[vpc_id].pop(group_id)
def delete_security_group(self, name=None, group_id=None):

View File

@ -51,7 +51,7 @@ class SpotInstanceRequest(BotoSpotRequest, TaggedEC2Resource):
self.status_message = "Your Spot request has been submitted for review, and is pending evaluation."
if price:
price = float(price)
price = "{0:.6f}".format(price) # round up/down to 6 decimals
price = f"{price:.6f}" # round up/down to 6 decimals
self.price = price
self.type = spot_instance_type
self.valid_from = valid_from

View File

@ -186,9 +186,7 @@ class Subnet(TaggedEC2Resource, CloudFormationModel):
def request_ip(self, ip, instance):
if ipaddress.ip_address(ip) not in self.cidr:
raise Exception(
"IP does not fall in the subnet CIDR of {0}".format(self.cidr)
)
raise Exception(f"IP does not fall in the subnet CIDR of {self.cidr}")
if ip in self._subnet_ips:
raise Exception("IP already in use")
@ -439,7 +437,5 @@ class SubnetRouteTableAssociationBackend:
def create_subnet_association(self, route_table_id, subnet_id):
subnet_association = SubnetRouteTableAssociation(route_table_id, subnet_id)
self.subnet_associations[
"{0}:{1}".format(route_table_id, subnet_id)
] = subnet_association
self.subnet_associations[f"{route_table_id}:{subnet_id}"] = subnet_association
return subnet_association

View File

@ -437,14 +437,14 @@ class VPCBackend:
]
if vpn_gateways:
raise DependencyViolationError(
"The vpc {0} has dependencies and cannot be deleted.".format(vpc_id)
f"The vpc {vpc_id} has dependencies and cannot be deleted."
)
# Delete route table if only main route table remains.
route_tables = self.describe_route_tables(filters={"vpc-id": vpc_id})
if len(route_tables) > 1:
raise DependencyViolationError(
"The vpc {0} has dependencies and cannot be deleted.".format(vpc_id)
f"The vpc {vpc_id} has dependencies and cannot be deleted."
)
for route_table in route_tables:
self.delete_route_table(route_table.id)

View File

@ -363,9 +363,7 @@ class InstanceResponse(EC2BaseResponse):
if "no_device" in device_mapping:
assert isinstance(
device_mapping["no_device"], str
), "botocore {} isn't limiting NoDevice to str type anymore, it is type:{}".format(
botocore_version, type(device_mapping["no_device"])
)
), f"botocore {botocore_version} isn't limiting NoDevice to str type anymore, it is type:{type(device_mapping['no_device'])}"
if device_mapping["no_device"] == "":
# the only legit value it can have is empty string
# and none of the other checks here matter if NoDevice

View File

@ -38,7 +38,7 @@ def xml_serialize(tree, key, value):
pass
else:
raise NotImplementedError(
'Don\'t know how to serialize "{}" to xml'.format(value.__class__)
f'Don\'t know how to serialize "{value.__class__}" to xml'
)

View File

@ -48,9 +48,9 @@ class Subnets(EC2BaseResponse):
subnet_id = self._get_param("SubnetId")
for attribute in ("MapPublicIpOnLaunch", "AssignIpv6AddressOnCreation"):
if self.querystring.get("%s.Value" % attribute):
if self.querystring.get(f"{attribute}.Value"):
attr_name = camelcase_to_underscores(attribute)
attr_value = self.querystring.get("%s.Value" % attribute)[0]
attr_value = self.querystring.get(f"{attribute}.Value")[0]
self.ec2_backend.modify_subnet_attribute(
subnet_id, attr_name, attr_value
)

View File

@ -142,9 +142,9 @@ class VPCs(EC2BaseResponse):
"EnableDnsHostnames",
"EnableNetworkAddressUsageMetrics",
):
if self.querystring.get("%s.Value" % attribute):
if self.querystring.get(f"{attribute}.Value"):
attr_name = camelcase_to_underscores(attribute)
attr_value = self.querystring.get("%s.Value" % attribute)[0]
attr_value = self.querystring.get(f"{attribute}.Value")[0]
self.ec2_backend.modify_vpc_attribute(vpc_id, attr_name, attr_value)
return MODIFY_VPC_ATTRIBUTE_RESPONSE
return None

View File

@ -235,7 +235,7 @@ def random_carrier_gateway_id():
def random_public_ip():
return "54.214.{0}.{1}".format(random.choice(range(255)), random.choice(range(255)))
return f"54.214.{random.choice(range(255))}.{random.choice(range(255))}"
def random_private_ip(cidr=None, ipv6=False):
@ -249,39 +249,29 @@ def random_private_ip(cidr=None, ipv6=False):
ula = ipaddress.IPv4Network(cidr)
return str(ula.network_address + (random.getrandbits(32 - ula.prefixlen)))
if ipv6:
return "2001::cafe:%x/64" % random.getrandbits(16)
return "10.{0}.{1}.{2}".format(
random.choice(range(255)), random.choice(range(255)), random.choice(range(255))
)
return f"2001::cafe:{random.getrandbits(16)}x/64"
return f"10.{random.choice(range(255))}.{random.choice(range(255))}.{random.choice(range(255))}"
def random_ip():
return "127.{0}.{1}.{2}".format(
random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)
)
return f"127.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
def generate_dns_from_ip(ip, dns_type="internal"):
splits = ip.split("/")[0].split(".") if "/" in ip else ip.split(".")
return "ip-{}-{}-{}-{}.ec2.{}".format(
splits[0], splits[1], splits[2], splits[3], dns_type
)
return f"ip-{splits[0]}-{splits[1]}-{splits[2]}-{splits[3]}.ec2.{dns_type}"
def random_mac_address():
return "02:00:00:%02x:%02x:%02x" % (
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255),
)
return f"02:00:00:{random.randint(0, 255)}02x:{random.randint(0, 255)}02x:{random.randint(0, 255)}02x"
def randor_ipv4_cidr():
return "10.0.{}.{}/16".format(random.randint(0, 255), random.randint(0, 255))
return f"10.0.{random.randint(0, 255)}.{random.randint(0, 255)}/16"
def random_ipv6_cidr():
return "2400:6500:{}:{}00::/56".format(random_resource_id(4), random_resource_id(2))
return f"2400:6500:{random_resource_id(4)}:{random_resource_id(2)}00::/56"
def generate_route_id(
@ -291,7 +281,7 @@ def generate_route_id(
cidr_block = ipv6_cidr_block
if prefix_list and not cidr_block:
cidr_block = prefix_list
return "%s~%s" % (route_table_id, cidr_block)
return f"{route_table_id}~{cidr_block}"
def random_managed_prefix_list_id():
@ -300,9 +290,9 @@ def random_managed_prefix_list_id():
def create_dns_entries(service_name, vpc_endpoint_id):
dns_entries = {}
dns_entries["dns_name"] = "{}-{}.{}".format(
vpc_endpoint_id, random_resource_id(8), service_name
)
dns_entries[
"dns_name"
] = f"{vpc_endpoint_id}-{random_resource_id(8)}.{service_name}"
dns_entries["hosted_zone_id"] = random_resource_id(13).upper()
return dns_entries
@ -310,9 +300,7 @@ def create_dns_entries(service_name, vpc_endpoint_id):
def utc_date_and_time():
x = datetime.utcnow()
# Better performing alternative to x.strftime("%Y-%m-%dT%H:%M:%S.000Z")
return "{}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.000Z".format(
x.year, x.month, x.day, x.hour, x.minute, x.second
)
return f"{x.year}-{x.month:02d}-{x.day:02d}T{x.hour:02d}:{x.minute:02d}:{x.second:02d}.000Z"
def split_route_id(route_id):
@ -322,7 +310,7 @@ def split_route_id(route_id):
def get_attribute_value(parameter, querystring_dict):
for key, value in querystring_dict.items():
match = re.search(r"{0}.Value".format(parameter), key)
match = re.search(rf"{parameter}.Value", key)
if match:
if value[0].lower() in ["true", "false"]:
return True if value[0].lower() in ["true"] else False

View File

@ -112,9 +112,9 @@ class Repository(BaseObject, CloudFormationModel):
)
if not image:
image_id_rep = "{{imageDigest:'{0}', imageTag:'{1}'}}".format(
image_digest or "null", image_tag or "null"
)
idigest = image_digest or "null"
itag = image_tag or "null"
image_id_rep = f"{{imageDigest:'{idigest}', imageTag:'{itag}'}}"
raise ImageNotFoundException(
image_id=image_id_rep,
@ -249,9 +249,9 @@ class Image(BaseObject):
self.last_scan = None
def _create_digest(self):
image_contents = "docker_image{0}".format(int(random.random() * 10**6))
image_contents = f"docker_image{int(random.random() * 10**6)}"
self.image_digest = (
"sha256:%s" % hashlib.sha256(image_contents.encode("utf-8")).hexdigest()
"sha256:" + hashlib.sha256(image_contents.encode("utf-8")).hexdigest()
)
def get_image_digest(self):
@ -488,7 +488,7 @@ class ECRBackend(BaseBackend):
if repository_name in self.repositories:
repository = self.repositories[repository_name]
else:
raise Exception("{0} is not a repository".format(repository_name))
raise Exception(f"{repository_name} is not a repository")
# Tags are unique, so delete any existing image with this tag first
self.batch_delete_image(
@ -885,10 +885,9 @@ class ECRBackend(BaseBackend):
image = repo._get_image(image_id.get("imageTag"), image_id.get("imageDigest"))
if not image.last_scan:
image_id_rep = "{{imageDigest:'{0}', imageTag:'{1}'}}".format(
image_id.get("imageDigest") or "null",
image_id.get("imageTag") or "null",
)
idigest = image_id.get("imageDigest") or "null"
itag = image_id.get("imageTag") or "null"
image_id_rep = f"{{imageDigest:'{idigest}', imageTag:'{itag}'}}"
raise ScanNotFoundException(
image_id=image_id_rep,
repository_name=repository_name,

View File

@ -137,15 +137,13 @@ class ECRResponse(BaseResponse):
registry_ids = [self.current_account]
auth_data = []
for registry_id in registry_ids:
password = "{}-auth-token".format(registry_id)
auth_token = b64encode("AWS:{}".format(password).encode("ascii")).decode()
password = f"{registry_id}-auth-token"
auth_token = b64encode(f"AWS:{password}".encode("ascii")).decode()
auth_data.append(
{
"authorizationToken": auth_token,
"expiresAt": time.mktime(datetime(2015, 1, 1).timetuple()),
"proxyEndpoint": "https://{}.dkr.ecr.{}.amazonaws.com".format(
registry_id, self.region
),
"proxyEndpoint": f"https://{registry_id}.dkr.ecr.{self.region}.amazonaws.com",
}
)
return json.dumps({"authorizationData": auth_data})

View File

@ -239,7 +239,7 @@ class TaskDefinition(BaseObject, CloudFormationModel):
properties = cloudformation_json["Properties"]
family = properties.get(
"Family", "task-definition-{0}".format(int(mock_random.random() * 10**6))
"Family", f"task-definition-{int(mock_random.random() * 10**6)}"
)
container_definitions = remap_nested_keys(
properties.get("ContainerDefinitions", []), pascal_to_camelcase
@ -262,7 +262,7 @@ class TaskDefinition(BaseObject, CloudFormationModel):
):
properties = cloudformation_json["Properties"]
family = properties.get(
"Family", "task-definition-{0}".format(int(mock_random.random() * 10**6))
"Family", f"task-definition-{int(mock_random.random() * 10**6)}"
)
container_definitions = properties["ContainerDefinitions"]
volumes = properties.get("Volumes")
@ -424,7 +424,7 @@ class Service(BaseObject, CloudFormationModel):
{
"createdAt": datetime.now(pytz.utc),
"desiredCount": self.desired_count,
"id": "ecs-svc/{}".format(mock_random.randint(0, 32**12)),
"id": f"ecs-svc/{mock_random.randint(0, 32**12)}",
"launchType": self.launch_type,
"pendingCount": self.desired_count,
"runningCount": 0,
@ -751,7 +751,7 @@ class TaskSet(BaseObject):
self.createdAt = datetime.now(pytz.utc)
self.updatedAt = datetime.now(pytz.utc)
self.stabilityStatusAt = datetime.now(pytz.utc)
self.id = "ecs-svc/{}".format(mock_random.randint(0, 32**12))
self.id = f"ecs-svc/{mock_random.randint(0, 32**12)}"
self.service_arn = ""
self.cluster_arn = ""
@ -843,7 +843,7 @@ class EC2ContainerServiceBackend(BaseBackend):
):
return self.task_definitions[family][revision]
else:
raise Exception("{0} is not a task_definition".format(task_definition_name))
raise Exception(f"{task_definition_name} is not a task_definition")
def create_cluster(
self, cluster_name: str, tags: Any = None, cluster_settings: Any = None
@ -1023,7 +1023,7 @@ class EC2ContainerServiceBackend(BaseBackend):
self.container_instances.get(cluster.name, {}).keys()
)
if not container_instances:
raise Exception("No instances found in cluster {}".format(cluster.name))
raise Exception(f"No instances found in cluster {cluster.name}")
active_container_instances = [
x
for x in container_instances
@ -1255,7 +1255,7 @@ class EC2ContainerServiceBackend(BaseBackend):
task_id = task_str.split("/")[-1]
tasks = self.tasks.get(cluster.name, None)
if not tasks:
raise Exception("Cluster {} has no registered tasks".format(cluster.name))
raise Exception(f"Cluster {cluster.name} has no registered tasks")
for task in tasks.keys():
if task.endswith(task_id):
container_instance_arn = tasks[task].container_instance_arn
@ -1269,9 +1269,7 @@ class EC2ContainerServiceBackend(BaseBackend):
tasks[task].desired_status = "STOPPED"
tasks[task].stopped_reason = reason
return tasks[task]
raise Exception(
"Could not find task {} on cluster {}".format(task_str, cluster.name)
)
raise Exception(f"Could not find task {task_str} on cluster {cluster.name}")
def _get_service(self, cluster_str, service_str):
cluster = self._get_cluster(cluster_str)
@ -1320,7 +1318,7 @@ class EC2ContainerServiceBackend(BaseBackend):
backend=self,
service_registries=service_registries,
)
cluster_service_pair = "{0}:{1}".format(cluster.name, service_name)
cluster_service_pair = f"{cluster.name}:{service_name}"
self.services[cluster_service_pair] = service
return service
@ -1352,7 +1350,7 @@ class EC2ContainerServiceBackend(BaseBackend):
result = []
failures = []
for name in service_names:
cluster_service_pair = "{0}:{1}".format(cluster.name, name)
cluster_service_pair = f"{cluster.name}:{name}"
if cluster_service_pair in self.services:
result.append(self.services[cluster_service_pair])
else:
@ -1369,7 +1367,7 @@ class EC2ContainerServiceBackend(BaseBackend):
cluster = self._get_cluster(cluster_str)
service_name = service_str.split("/")[-1]
cluster_service_pair = "{0}:{1}".format(cluster.name, service_name)
cluster_service_pair = f"{cluster.name}:{service_name}"
if cluster_service_pair in self.services:
if task_definition_str is not None:
self.describe_task_definition(task_definition_str)
@ -1386,7 +1384,7 @@ class EC2ContainerServiceBackend(BaseBackend):
cluster = self._get_cluster(cluster_name)
service = self._get_service(cluster_name, service_name)
cluster_service_pair = "{0}:{1}".format(cluster.name, service.name)
cluster_service_pair = f"{cluster.name}:{service.name}"
service = self.services[cluster_service_pair]
if service.desired_count > 0 and not force:
@ -1399,7 +1397,7 @@ class EC2ContainerServiceBackend(BaseBackend):
def register_container_instance(self, cluster_str, ec2_instance_id):
cluster_name = cluster_str.split("/")[-1]
if cluster_name not in self.clusters:
raise Exception("{0} is not a cluster".format(cluster_name))
raise Exception(f"{cluster_name} is not a cluster")
container_instance = ContainerInstance(
ec2_instance_id,
self.account_id,
@ -1565,15 +1563,14 @@ class EC2ContainerServiceBackend(BaseBackend):
self.container_instances[cluster_name][arn].attributes[name] = value
except KeyError:
raise JsonRESTError(
"TargetNotFoundException", "Could not find {0}".format(target_id)
"TargetNotFoundException", f"Could not find {target_id}"
)
else:
# targetId is container uuid, targetType must be container-instance
try:
if target_type != "container-instance":
raise JsonRESTError(
"TargetNotFoundException",
"Could not find {0}".format(target_id),
"TargetNotFoundException", f"Could not find {target_id}"
)
self.container_instances[cluster_name][target_id].attributes[
@ -1581,7 +1578,7 @@ class EC2ContainerServiceBackend(BaseBackend):
] = value
except KeyError:
raise JsonRESTError(
"TargetNotFoundException", "Could not find {0}".format(target_id)
"TargetNotFoundException", f"Could not find {target_id}"
)
def list_attributes(
@ -1657,15 +1654,14 @@ class EC2ContainerServiceBackend(BaseBackend):
del instance.attributes[name]
except KeyError:
raise JsonRESTError(
"TargetNotFoundException", "Could not find {0}".format(target_id)
"TargetNotFoundException", f"Could not find {target_id}"
)
else:
# targetId is container uuid, targetType must be container-instance
try:
if target_type != "container-instance":
raise JsonRESTError(
"TargetNotFoundException",
"Could not find {0}".format(target_id),
"TargetNotFoundException", f"Could not find {target_id}"
)
instance = self.container_instances[cluster_name][target_id]
@ -1673,7 +1669,7 @@ class EC2ContainerServiceBackend(BaseBackend):
del instance.attributes[name]
except KeyError:
raise JsonRESTError(
"TargetNotFoundException", "Could not find {0}".format(target_id)
"TargetNotFoundException", f"Could not find {target_id}"
)
def list_task_definition_families(self, family_prefix=None):
@ -1802,9 +1798,7 @@ class EC2ContainerServiceBackend(BaseBackend):
service_name = service.split("/")[-1]
cluster_obj = self._get_cluster(cluster_str)
service_obj = self.services.get(
"{0}:{1}".format(cluster_obj.name, service_name)
)
service_obj = self.services.get(f"{cluster_obj.name}:{service_name}")
if not service_obj:
raise ServiceNotFoundException
@ -1824,7 +1818,7 @@ class EC2ContainerServiceBackend(BaseBackend):
cluster_obj = self._get_cluster(cluster_str)
service_name = service.split("/")[-1]
service_key = "{0}:{1}".format(cluster_obj.name, service_name)
service_key = f"{cluster_obj.name}:{service_name}"
service_obj = self.services.get(service_key)
if not service_obj:
@ -1847,7 +1841,7 @@ class EC2ContainerServiceBackend(BaseBackend):
cluster_name = cluster.split("/")[-1]
service_name = service.split("/")[-1]
service_key = "{0}:{1}".format(cluster_name, service_name)
service_key = f"{cluster_name}:{service_name}"
task_set_element = None
for i, ts in enumerate(self.services[service_key].task_sets):
if task_set == ts.task_set_arn:

View File

@ -20,7 +20,7 @@ class FileSystemAlreadyExists(EFSError):
def __init__(self, creation_token, *args, **kwargs):
super().__init__(
"FileSystemAlreadyExists",
"File system with {} already exists.".format(creation_token),
f"File system with {creation_token} already exists.",
*args,
**kwargs,
)
@ -32,7 +32,7 @@ class FileSystemNotFound(EFSError):
def __init__(self, file_system_id, *args, **kwargs):
super().__init__(
"FileSystemNotFound",
"File system {} does not exist.".format(file_system_id),
f"File system {file_system_id} does not exist.",
*args,
**kwargs,
)
@ -58,7 +58,7 @@ class MountTargetNotFound(EFSError):
def __init__(self, mount_target_id, *args, **kwargs):
super().__init__(
"MountTargetNotFound",
"Mount target '{}' does not exist.".format(mount_target_id),
f"Mount target '{mount_target_id}' does not exist.",
*args,
**kwargs,
)
@ -84,7 +84,7 @@ class SubnetNotFound(EFSError):
def __init__(self, subnet_id, *args, **kwargs):
super().__init__(
"SubnetNotFound",
"The subnet ID '{}' does not exist".format(subnet_id),
f"The subnet ID '{subnet_id}' does not exist",
*args,
**kwargs,
)
@ -96,7 +96,7 @@ class SecurityGroupNotFound(EFSError):
def __init__(self, security_group_id, *args, **kwargs):
super().__init__(
"SecurityGroupNotFound",
"The SecurityGroup ID '{}' does not exist".format(security_group_id),
f"The SecurityGroup ID '{security_group_id}' does not exist",
*args,
**kwargs,
)

View File

@ -225,7 +225,7 @@ class FileSystem(CloudFormationModel):
raise ValueError("BackupPolicy must be of type BackupPolicy.")
status = props.pop("backup_policy")["status"]
if status not in ["ENABLED", "DISABLED"]:
raise ValueError('Invalid status: "{}".'.format(status))
raise ValueError(f'Invalid status: "{status}".')
props["backup"] = status == "ENABLED"
if "bypass_policy_lockout_safety_check" in props:
raise ValueError(
@ -293,7 +293,7 @@ class MountTarget(CloudFormationModel):
# Init non-user-assigned values.
self.owner_id = account_id
self.mount_target_id = "fsmt-{}".format(mock_random.get_random_hex())
self.mount_target_id = f"fsmt-{mock_random.get_random_hex()}"
self.life_cycle_state = "available"
self.network_interface_id = None
self.availability_zone_id = subnet.availability_zone_id
@ -414,7 +414,7 @@ class EFSBackend(BaseBackend):
# Create a new file system ID:
def make_id():
return "fs-{}".format(mock_random.get_random_hex())
return f"fs-{mock_random.get_random_hex()}"
fsid = make_id()
while fsid in self.file_systems_by_id:

View File

@ -20,7 +20,7 @@ class FakeEnvironment(BaseModel):
@property
def environment_arn(self):
resource_path = "%s/%s" % (self.application_name, self.environment_name)
resource_path = f"{self.application_name}/{self.environment_name}"
return make_arn(
self.region, self.application.account_id, "environment", resource_path
)
@ -77,7 +77,7 @@ class EBBackend(BaseBackend):
def create_application(self, application_name):
if application_name in self.applications:
raise InvalidParameterValueError(
"Application {} already exists.".format(application_name)
f"Application {application_name} already exists."
)
new_app = FakeApplication(backend=self, application_name=application_name)
self.applications[application_name] = new_app
@ -104,7 +104,7 @@ class EBBackend(BaseBackend):
res = self._find_environment_by_arn(resource_arn)
except KeyError:
raise ResourceNotFoundException(
"Resource not found for ARN '{}'.".format(resource_arn)
f"Resource not found for ARN '{resource_arn}'."
)
for key, value in tags_to_add.items():
@ -118,7 +118,7 @@ class EBBackend(BaseBackend):
res = self._find_environment_by_arn(resource_arn)
except KeyError:
raise ResourceNotFoundException(
"Resource not found for ARN '{}'.".format(resource_arn)
f"Resource not found for ARN '{resource_arn}'."
)
return res.tags

View File

@ -17,7 +17,7 @@ class Pipeline(BaseModel):
):
a = "".join(random.choice(string.digits) for _ in range(13))
b = "".join(random.choice(string.ascii_lowercase) for _ in range(6))
self.id = "{}-{}".format(a, b)
self.id = f"{a}-{b}"
self.name = name
self.arn = f"arn:aws:elastictranscoder:{region}:{account_id}:pipeline/{self.id}"
self.status = "Active"

View File

@ -40,7 +40,7 @@ class ElasticTranscoderResponse(BaseResponse):
if not role:
return self.err("Role cannot be blank")
if not re.match("^arn:aws:iam::[0-9]+:role/.+", role):
return self.err("Role ARN is invalid: {}".format(role))
return self.err(f"Role ARN is invalid: {role}")
if not output_bucket and not content_config:
return self.err(
"[OutputBucket and ContentConfig:Bucket are not allowed to both be null.]"
@ -75,9 +75,7 @@ class ElasticTranscoderResponse(BaseResponse):
def validate_pipeline_id(self, pipeline_id):
r = "^\\d{13}-\\w{6}$"
if not re.match(r, pipeline_id):
msg = "1 validation error detected: Value '{}' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: {}".format(
pipeline_id, r
)
msg = f"1 validation error detected: Value '{pipeline_id}' at 'id' failed to satisfy constraint: Member must satisfy regular expression pattern: {r}"
return self.err(msg)
try:
self.elastictranscoder_backend.read_pipeline(pipeline_id)

View File

@ -11,7 +11,7 @@ class ELBClientError(RESTError):
class DuplicateTagKeysError(ELBClientError):
def __init__(self, cidr):
super().__init__(
"DuplicateTagKeys", "Tag key was specified more than once: {0}".format(cidr)
"DuplicateTagKeys", f"Tag key was specified more than once: {cidr}"
)
@ -26,7 +26,7 @@ class LoadBalancerNotFoundError(ELBClientError):
def __init__(self, cidr):
super().__init__(
"LoadBalancerNotFound",
"The specified load balancer does not exist: {0}".format(cidr),
f"The specified load balancer does not exist: {cidr}",
)
@ -57,9 +57,7 @@ class DuplicateListenerError(ELBClientError):
def __init__(self, name, port):
super().__init__(
"DuplicateListener",
"A listener already exists for {0} with LoadBalancerPort {1}, but with a different InstancePort, Protocol, or SSLCertificateId".format(
name, port
),
f"A listener already exists for {name} with LoadBalancerPort {port}, but with a different InstancePort, Protocol, or SSLCertificateId",
)
@ -67,9 +65,7 @@ class DuplicateLoadBalancerName(ELBClientError):
def __init__(self, name):
super().__init__(
"DuplicateLoadBalancerName",
"The specified load balancer name already exists for this account: {0}".format(
name
),
f"The specified load balancer name already exists for this account: {name}",
)

View File

@ -43,13 +43,7 @@ class FakeListener(BaseModel):
self.policy_names = []
def __repr__(self):
return "FakeListener(lbp: %s, inp: %s, pro: %s, cid: %s, policies: %s)" % (
self.load_balancer_port,
self.instance_port,
self.protocol,
self.ssl_certificate_id,
self.policy_names,
)
return f"FakeListener(lbp: {self.load_balancer_port}, inp: {self.instance_port}, pro: {self.protocol}, cid: {self.ssl_certificate_id}, policies: {self.policy_names})"
class FakeBackend(BaseModel):
@ -58,10 +52,7 @@ class FakeBackend(BaseModel):
self.policy_names = []
def __repr__(self):
return "FakeBackend(inp: %s, policies: %s)" % (
self.instance_port,
self.policy_names,
)
return f"FakeBackend(inp: {self.instance_port}, policies: {self.policy_names})"
class FakeLoadBalancer(CloudFormationModel):
@ -91,7 +82,7 @@ class FakeLoadBalancer(CloudFormationModel):
self.subnets = subnets or []
self.vpc_id = vpc_id
self.tags = {}
self.dns_name = "%s.us-east-1.elb.amazonaws.com" % (name)
self.dns_name = f"{name}.us-east-1.elb.amazonaws.com"
for port in ports:
listener = FakeListener(

View File

@ -315,7 +315,7 @@ class ELBResponse(BaseResponse):
if "LoadBalancerNames.member" in key:
number = key.split(".")[2]
load_balancer_name = self._get_param(
"LoadBalancerNames.member.{0}".format(number)
f"LoadBalancerNames.member.{number}"
)
elb = self.elb_backend.get_load_balancer(load_balancer_name)
if not elb:
@ -335,7 +335,7 @@ class ELBResponse(BaseResponse):
if "LoadBalancerNames.member" in key:
number = key.split(".")[2]
load_balancer_name = self._get_param(
"LoadBalancerNames.member.{0}".format(number)
f"LoadBalancerNames.member.{number}"
)
elb = self.elb_backend.get_load_balancer(load_balancer_name)
if not elb:

View File

@ -31,7 +31,7 @@ def api_version_elb_backend(*args, **kwargs):
elif "2015-12-01" == version:
return ELBV2Response.dispatch(*args, **kwargs)
else:
raise Exception("Unknown ELB API version: {}".format(version))
raise Exception(f"Unknown ELB API version: {version}")
url_bases = [r"https?://elasticloadbalancing\.(.+)\.amazonaws.com"]

View File

@ -11,7 +11,7 @@ class ELBClientError(RESTError):
class DuplicateTagKeysError(ELBClientError):
def __init__(self, cidr):
super().__init__(
"DuplicateTagKeys", "Tag key was specified more than once: {0}".format(cidr)
"DuplicateTagKeys", f"Tag key was specified more than once: {cidr}"
)
@ -107,10 +107,10 @@ class InvalidConditionFieldError(ELBClientError):
]
def __init__(self, invalid_name):
valid = ",".join(self.VALID_FIELDS)
super().__init__(
"ValidationError",
"Condition field '%s' must be one of '[%s]'"
% (invalid_name, ",".join(self.VALID_FIELDS)),
f"Condition field '{invalid_name}' must be one of '[{valid}]'",
)
@ -123,14 +123,13 @@ class InvalidActionTypeError(ELBClientError):
def __init__(self, invalid_name, index):
super().__init__(
"ValidationError",
"1 validation error detected: Value '%s' at 'actions.%s.member.type' failed to satisfy constraint: Member must satisfy enum value set: [forward, redirect, fixed-response]"
% (invalid_name, index),
f"1 validation error detected: Value '{invalid_name}' at 'actions.{index}.member.type' failed to satisfy constraint: Member must satisfy enum value set: [forward, redirect, fixed-response]",
)
class ActionTargetGroupNotFoundError(ELBClientError):
def __init__(self, arn):
super().__init__("TargetGroupNotFound", "Target group '%s' not found" % arn)
super().__init__("TargetGroupNotFound", f"Target group '{arn}' not found")
class ListenerOrBalancerMissingError(ELBClientError):
@ -160,8 +159,7 @@ class RuleNotFoundError(ELBClientError):
class DuplicatePriorityError(ELBClientError):
def __init__(self, invalid_value):
super().__init__(
"ValidationError",
"Priority '%s' was provided multiple times" % invalid_value,
"ValidationError", f"Priority '{invalid_value}' was provided multiple times"
)

View File

@ -372,8 +372,8 @@ class FakeListenerRule(CloudFormationModel):
class FakeRule(BaseModel):
def __init__(self, listener_arn, conditions, priority, actions, is_default):
self.listener_arn = listener_arn
self.arn = listener_arn.replace(":listener/", ":listener-rule/") + "/%s" % (
id(self)
self.arn = (
listener_arn.replace(":listener/", ":listener-rule/") + f"/{id(self)}"
)
self.conditions = conditions
self.priority = priority # int or 'default'
@ -506,10 +506,7 @@ class FakeBackend(BaseModel):
self.policy_names = []
def __repr__(self):
return "FakeBackend(inp: %s, policies: %s)" % (
self.instance_port,
self.policy_names,
)
return f"FakeBackend(inp: {self.instance_port}, policies: {self.policy_names})"
class FakeLoadBalancer(CloudFormationModel):
@ -639,7 +636,7 @@ class FakeLoadBalancer(CloudFormationModel):
return self.name
elif attribute_name in not_implemented_yet:
raise NotImplementedError(
'"Fn::GetAtt" : [ "{0}" , "%s" ]"' % attribute_name
f'"Fn::GetAtt" : [ "{{0}}" , "{attribute_name}" ]"'
)
else:
raise UnformattedGetAttTemplateException()
@ -701,7 +698,7 @@ class ELBv2Backend(BaseBackend):
arn = make_arn_for_load_balancer(
account_id=self.account_id, name=name, region_name=self.region_name
)
dns_name = "%s-1.%s.elb.amazonaws.com" % (name, self.region_name)
dns_name = f"{name}-1.{self.region_name}.elb.amazonaws.com"
if arn in self.load_balancers:
raise DuplicateLoadBalancerName()
@ -786,7 +783,7 @@ class ELBv2Backend(BaseBackend):
"path-pattern",
]:
raise InvalidConditionValueError(
"The 'Values' field is not compatible with '%s'" % field
f"The 'Values' field is not compatible with '{field}'"
)
else:
method_name = "_validate_" + field.replace("-", "_") + "_condition"
@ -950,10 +947,8 @@ class ELBv2Backend(BaseBackend):
expression = r"^(2|4|5)\d\d$"
if not re.match(expression, status_code):
raise InvalidStatusCodeActionTypeError(
"1 validation error detected: Value '{}' at 'actions.{}.member.fixedResponseConfig.statusCode' failed to satisfy constraint: \
Member must satisfy regular expression pattern: {}".format(
status_code, index, expression
)
f"1 validation error detected: Value '{status_code}' at 'actions.{index}.member.fixedResponseConfig.statusCode' failed to satisfy constraint: \
Member must satisfy regular expression pattern: {expression}"
)
content_type = action.data["FixedResponseConfig"].get("ContentType")
if content_type and content_type not in [
@ -970,15 +965,11 @@ Member must satisfy regular expression pattern: {}".format(
def create_target_group(self, name, **kwargs):
if len(name) > 32:
raise InvalidTargetGroupNameError(
"Target group name '{}' cannot be longer than '32' characters".format(
name
)
f"Target group name '{name}' cannot be longer than '32' characters"
)
if not re.match(r"^[a-zA-Z0-9\-]+$", name):
raise InvalidTargetGroupNameError(
"Target group name '{}' can only contain characters that are alphanumeric characters or hyphens(-)".format(
name
)
f"Target group name '{name}' can only contain characters that are alphanumeric characters or hyphens(-)"
)
# undocumented validation
@ -990,7 +981,7 @@ Member must satisfy regular expression pattern: {}".format(
if name.startswith("-") or name.endswith("-"):
raise InvalidTargetGroupNameError(
"Target group name '%s' cannot begin or end with '-'" % name
f"Target group name '{name}' cannot begin or end with '-'"
)
for target_group in self.target_groups.values():
if target_group.name == name:
@ -1002,17 +993,13 @@ Member must satisfy regular expression pattern: {}".format(
and kwargs["healthcheck_protocol"] not in valid_protocols
):
raise InvalidConditionValueError(
"Value {} at 'healthCheckProtocol' failed to satisfy constraint: "
"Member must satisfy enum value set: {}".format(
kwargs["healthcheck_protocol"], valid_protocols
)
f"Value {kwargs['healthcheck_protocol']} at 'healthCheckProtocol' failed to satisfy constraint: "
f"Member must satisfy enum value set: {valid_protocols}"
)
if kwargs.get("protocol") and kwargs["protocol"] not in valid_protocols:
raise InvalidConditionValueError(
"Value {} at 'protocol' failed to satisfy constraint: "
"Member must satisfy enum value set: {}".format(
kwargs["protocol"], valid_protocols
)
f"Value {kwargs['protocol']} at 'protocol' failed to satisfy constraint: "
f"Member must satisfy enum value set: {valid_protocols}"
)
if (
@ -1088,9 +1075,9 @@ Member must satisfy regular expression pattern: {}".format(
self._validate_actions(default_actions)
arn = load_balancer_arn.replace(":loadbalancer/", ":listener/") + "/%s%s" % (
port,
id(self),
arn = (
load_balancer_arn.replace(":loadbalancer/", ":listener/")
+ f"/{port}{id(self)}"
)
listener = FakeListener(
load_balancer_arn,
@ -1246,9 +1233,7 @@ Member must satisfy regular expression pattern: {}".format(
if target_group:
if self._any_listener_using(target_group_arn):
raise ResourceInUseError(
"The target group '{}' is currently in use by a listener or a rule".format(
target_group_arn
)
f"The target group '{target_group_arn}' is currently in use by a listener or a rule"
)
del self.target_groups[target_group_arn]
return target_group
@ -1376,7 +1361,7 @@ Member must satisfy regular expression pattern: {}".format(
if self.ec2_backend.get_security_group_from_id(sec_group_id) is None:
raise RESTError(
"InvalidSecurityGroup",
"Security group {0} does not exist".format(sec_group_id),
f"Security group {sec_group_id} does not exist",
)
balancer.security_groups = sec_groups
@ -1430,9 +1415,7 @@ Member must satisfy regular expression pattern: {}".format(
for key in attrs:
if key not in FakeLoadBalancer.VALID_ATTRS:
raise RESTError(
"InvalidConfigurationRequest", "Key {0} not valid".format(key)
)
raise RESTError("InvalidConfigurationRequest", f"Key {key} not valid")
balancer.attrs.update(attrs)
return balancer.attrs
@ -1510,7 +1493,7 @@ Member must satisfy regular expression pattern: {}".format(
if protocol not in (None, "HTTP", "HTTPS", "TCP"):
raise RESTError(
"UnsupportedProtocol", "Protocol {0} is not supported".format(protocol)
"UnsupportedProtocol", f"Protocol {protocol} is not supported"
)
# HTTPS checks
@ -1524,7 +1507,7 @@ Member must satisfy regular expression pattern: {}".format(
if not self._certificate_exists(certificate_arn=default_cert_arn):
raise RESTError(
"CertificateNotFound",
"Certificate {0} not found".format(default_cert_arn),
f"Certificate {default_cert_arn} not found",
)
listener.certificate = default_cert_arn
listener.certificates = certificates

View File

@ -585,9 +585,7 @@ class ELBV2Response(BaseResponse):
if ssl_policy is not None and ssl_policy not in [
item["name"] for item in SSL_POLICIES
]:
raise RESTError(
"SSLPolicyNotFound", "Policy {0} not found".format(ssl_policy)
)
raise RESTError("SSLPolicyNotFound", f"Policy {ssl_policy} not found")
listener = self.elbv2_backend.modify_listener(
arn, port, protocol, ssl_policy, certificates, default_actions

View File

@ -1,10 +1,6 @@
def make_arn_for_load_balancer(account_id, name, region_name):
return "arn:aws:elasticloadbalancing:{}:{}:loadbalancer/app/{}/50dc6c495c0c9188".format(
region_name, account_id, name
)
return f"arn:aws:elasticloadbalancing:{region_name}:{account_id}:loadbalancer/app/{name}/50dc6c495c0c9188"
def make_arn_for_target_group(account_id, name, region_name):
return "arn:aws:elasticloadbalancing:{}:{}:targetgroup/{}/50dc6c495c0c9188".format(
region_name, account_id, name
)
return f"arn:aws:elasticloadbalancing:{region_name}:{account_id}:targetgroup/{name}/50dc6c495c0c9188"

View File

@ -278,9 +278,7 @@ class FakeCluster(BaseModel):
@property
def arn(self):
return "arn:aws:elasticmapreduce:{0}:{1}:cluster/{2}".format(
self.emr_backend.region_name, self.emr_backend.account_id, self.id
)
return f"arn:aws:elasticmapreduce:{self.emr_backend.region_name}:{self.emr_backend.account_id}:cluster/{self.id}"
@property
def instance_groups(self):
@ -591,7 +589,7 @@ class ElasticMapReduceBackend(BaseBackend):
emr_managed_master_security_group,
emr_managed_slave_security_group,
service_access_security_group,
**_
**_,
):
default_return_value = (
emr_managed_master_security_group,
@ -608,10 +606,10 @@ class ElasticMapReduceBackend(BaseBackend):
subnet = self.ec2_backend.get_subnet(ec2_subnet_id)
except InvalidSubnetIdError:
warnings.warn(
"Could not find Subnet with id: {0}\n"
f"Could not find Subnet with id: {ec2_subnet_id}\n"
"In the near future, this will raise an error.\n"
"Use ec2.describe_subnets() to find a suitable id "
"for your test.".format(ec2_subnet_id),
"for your test.",
PendingDeprecationWarning,
)
return default_return_value
@ -674,9 +672,7 @@ class ElasticMapReduceBackend(BaseBackend):
def create_security_configuration(self, name, security_configuration):
if name in self.security_configurations:
raise InvalidRequestException(
message="SecurityConfiguration with name '{}' already exists.".format(
name
)
message=f"SecurityConfiguration with name '{name}' already exists."
)
security_configuration = FakeSecurityConfiguration(
name=name, security_configuration=security_configuration
@ -687,18 +683,14 @@ class ElasticMapReduceBackend(BaseBackend):
def get_security_configuration(self, name):
if name not in self.security_configurations:
raise InvalidRequestException(
message="Security configuration with name '{}' does not exist.".format(
name
)
message=f"Security configuration with name '{name}' does not exist."
)
return self.security_configurations[name]
def delete_security_configuration(self, name):
if name not in self.security_configurations:
raise InvalidRequestException(
message="Security configuration with name '{}' does not exist.".format(
name
)
message=f"Security configuration with name '{name}' does not exist."
)
del self.security_configurations[name]

View File

@ -308,7 +308,7 @@ class ElasticMapReduceResponse(BaseResponse):
config.pop(key)
config["properties"] = {}
map_items = self._get_map_prefix(
"Configurations.member.{0}.Properties.entry".format(idx)
f"Configurations.member.{idx}.Properties.entry"
)
config["properties"] = map_items
@ -439,7 +439,7 @@ class ElasticMapReduceResponse(BaseResponse):
iops = "iops"
volumes_per_instance = "volumes_per_instance"
key_ebs_optimized = "{0}._{1}".format(key_ebs_config, ebs_optimized)
key_ebs_optimized = f"{key_ebs_config}._{ebs_optimized}"
# EbsOptimized config
if key_ebs_optimized in ebs_configuration:
instance_group.pop(key_ebs_optimized)
@ -450,12 +450,10 @@ class ElasticMapReduceResponse(BaseResponse):
# Ebs Blocks
ebs_blocks = []
idx = 1
keyfmt = "{0}._{1}.member.{{}}".format(
key_ebs_config, ebs_block_device_configs
)
keyfmt = f"{key_ebs_config}._{ebs_block_device_configs}.member.{{}}"
key = keyfmt.format(idx)
while self._has_key_prefix(key, ebs_configuration):
vlespc_keyfmt = "{0}._{1}._{{}}".format(key, volume_specification)
vlespc_keyfmt = f"{key}._{volume_specification}._{{}}"
vol_size = vlespc_keyfmt.format(size_in_gb)
vol_iops = vlespc_keyfmt.format(iops)
vol_type = vlespc_keyfmt.format(volume_type)
@ -478,7 +476,7 @@ class ElasticMapReduceResponse(BaseResponse):
volume_type
] = ebs_configuration.pop(vol_type)
per_instance = "{0}._{1}".format(key, volumes_per_instance)
per_instance = f"{key}._{volumes_per_instance}"
if per_instance in ebs_configuration:
instance_group.pop(per_instance)
ebs_block[volumes_per_instance] = int(

View File

@ -15,15 +15,15 @@ def random_id(size=13):
def random_cluster_id():
return "j-{0}".format(random_id())
return f"j-{random_id()}"
def random_step_id():
return "s-{0}".format(random_id())
return f"s-{random_id()}"
def random_instance_group_id():
return "i-{0}".format(random_id())
return f"i-{random_id()}"
def steps_from_query_string(querystring_dict):
@ -163,11 +163,11 @@ class ReleaseLabel(object):
@classmethod
def parse(cls, release_label):
if not release_label:
raise ValueError("Invalid empty ReleaseLabel: %r" % release_label)
raise ValueError(f"Invalid empty ReleaseLabel: {release_label}")
match = cls.version_re.match(release_label)
if not match:
raise ValueError("Invalid ReleaseLabel: %r" % release_label)
raise ValueError(f"Invalid ReleaseLabel: {release_label}")
major, minor, patch = match.groups()
@ -178,11 +178,11 @@ class ReleaseLabel(object):
return major, minor, patch
def __str__(self):
version = "emr-%d.%d.%d" % (self.major, self.minor, self.patch)
version = f"emr-{self.major}.{self.minor}.{self.patch}"
return version
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, str(self))
return f"{self.__class__.__name__}({str(self)})"
def __iter__(self):
return iter((self.major, self.minor, self.patch))
@ -425,7 +425,7 @@ class EmrSecurityGroupManager(object):
if group is None:
if group_id_or_name != defaults.group_name:
raise ValueError(
"The security group '{}' does not exist".format(group_id_or_name)
f"The security group '{group_id_or_name}' does not exist"
)
group = create_sg(defaults.group_name, defaults.description(), self.vpc_id)
return group

View File

@ -68,19 +68,10 @@ class Rule(CloudFormationModel):
@property
def arn(self):
event_bus_name = (
""
if self.event_bus_name == "default"
else "{}/".format(self.event_bus_name)
"" if self.event_bus_name == "default" else f"{self.event_bus_name}/"
)
return (
"arn:aws:events:{region}:{account_id}:rule/{event_bus_name}{name}".format(
region=self.region_name,
account_id=self.account_id,
event_bus_name=event_bus_name,
name=self.name,
)
)
return f"arn:aws:events:{self.region_name}:{self.account_id}:rule/{event_bus_name}{self.name}"
@property
def physical_resource_id(self):
@ -145,7 +136,7 @@ class Rule(CloudFormationModel):
group_id = target.get("SqsParameters", {}).get("MessageGroupId")
self._send_to_sqs_queue(arn.resource_id, event, group_id)
else:
raise NotImplementedError("Expr not defined for {0}".format(type(self)))
raise NotImplementedError(f"Expr not defined for {type(self)}")
def _parse_arn(self, arn):
# http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
@ -890,9 +881,7 @@ class EventPattern:
return all(numeric_matches)
else:
warnings.warn(
"'{}' filter logic unimplemented. defaulting to True".format(
filter_name
)
f"'{filter_name}' filter logic unimplemented. defaulting to True"
)
return True
@ -1001,7 +990,7 @@ class EventsBackend(BaseBackend):
event_bus = self.event_buses.get(event_bus_name)
if not event_bus:
raise ResourceNotFoundException(
"Event bus {} does not exist.".format(event_bus_name)
f"Event bus {event_bus_name} does not exist."
)
return event_bus
@ -1009,7 +998,7 @@ class EventsBackend(BaseBackend):
def _get_replay(self, name):
replay = self.replays.get(name)
if not replay:
raise ResourceNotFoundException("Replay {} does not exist.".format(name))
raise ResourceNotFoundException(f"Replay {name} does not exist.")
return replay
@ -1081,7 +1070,7 @@ class EventsBackend(BaseBackend):
def describe_rule(self, name):
rule = self.rules.get(name)
if not rule:
raise ResourceNotFoundException("Rule {} does not exist.".format(name))
raise ResourceNotFoundException(f"Rule {name} does not exist.")
return rule
def disable_rule(self, name):
@ -1158,8 +1147,7 @@ class EventsBackend(BaseBackend):
)
if invalid_arn:
raise ValidationException(
"Parameter {} is not valid. "
"Reason: Provided Arn is not in correct format.".format(invalid_arn)
f"Parameter {invalid_arn} is not valid. Reason: Provided Arn is not in correct format."
)
for target in targets:
@ -1171,16 +1159,14 @@ class EventsBackend(BaseBackend):
and not target.get("SqsParameters")
):
raise ValidationException(
"Parameter(s) SqsParameters must be specified for target: {}.".format(
target["Id"]
)
f"Parameter(s) SqsParameters must be specified for target: {target['Id']}."
)
rule = self.rules.get(name)
if not rule:
raise ResourceNotFoundException(
"Rule {0} does not exist on EventBus {1}.".format(name, event_bus_name)
f"Rule {name} does not exist on EventBus {event_bus_name}."
)
rule.put_targets(targets)
@ -1260,7 +1246,7 @@ class EventsBackend(BaseBackend):
if not rule:
raise ResourceNotFoundException(
"Rule {0} does not exist on EventBus {1}.".format(name, event_bus_name)
f"Rule {name} does not exist on EventBus {event_bus_name}."
)
rule.remove_targets(ids)
@ -1370,8 +1356,7 @@ class EventsBackend(BaseBackend):
def create_event_bus(self, name, event_source_name=None, tags=None):
if name in self.event_buses:
raise JsonRESTError(
"ResourceAlreadyExistsException",
"Event bus {} already exists.".format(name),
"ResourceAlreadyExistsException", f"Event bus {name} already exists."
)
if not event_source_name and "/" in name:
@ -1382,7 +1367,7 @@ class EventsBackend(BaseBackend):
if event_source_name and event_source_name not in self.event_sources:
raise JsonRESTError(
"ResourceNotFoundException",
"Event source {} does not exist.".format(event_source_name),
f"Event source {event_source_name} does not exist.",
)
event_bus = EventBus(self.account_id, self.region_name, name, tags=tags)
@ -1418,7 +1403,7 @@ class EventsBackend(BaseBackend):
if name in registry:
return self.tagger.list_tags_for_resource(registry[name].arn)
raise ResourceNotFoundException(
"Rule {0} does not exist on EventBus default.".format(name)
f"Rule {name} does not exist on EventBus default."
)
def tag_resource(self, arn, tags):
@ -1429,7 +1414,7 @@ class EventsBackend(BaseBackend):
self.tagger.tag_resource(registry[name].arn, tags)
return {}
raise ResourceNotFoundException(
"Rule {0} does not exist on EventBus default.".format(name)
f"Rule {name} does not exist on EventBus default."
)
def untag_resource(self, arn, tag_names):
@ -1440,23 +1425,21 @@ class EventsBackend(BaseBackend):
self.tagger.untag_resource_using_names(registry[name].arn, tag_names)
return {}
raise ResourceNotFoundException(
"Rule {0} does not exist on EventBus default.".format(name)
f"Rule {name} does not exist on EventBus default."
)
def create_archive(self, name, source_arn, description, event_pattern, retention):
if len(name) > 48:
raise ValidationException(
" 1 validation error detected: "
"Value '{}' at 'archiveName' failed to satisfy constraint: "
"Member must have length less than or equal to 48".format(name)
f"Value '{name}' at 'archiveName' failed to satisfy constraint: "
"Member must have length less than or equal to 48"
)
event_bus = self._get_event_bus(source_arn)
if name in self.archives:
raise ResourceAlreadyExistsException(
"Archive {} already exists.".format(name)
)
raise ResourceAlreadyExistsException(f"Archive {name} already exists.")
archive = Archive(
self.account_id,
@ -1471,7 +1454,7 @@ class EventsBackend(BaseBackend):
rule_event_pattern = json.loads(event_pattern or "{}")
rule_event_pattern["replay-name"] = [{"exists": False}]
rule_name = "Events-Archive-{}".format(name)
rule_name = f"Events-Archive-{name}"
rule = self.put_rule(
rule_name,
event_pattern=json.dumps(rule_event_pattern),
@ -1484,14 +1467,12 @@ class EventsBackend(BaseBackend):
[
{
"Id": rule.name,
"Arn": "arn:aws:events:{}:::".format(self.region_name),
"Arn": f"arn:aws:events:{self.region_name}:::",
"InputTransformer": {
"InputPathsMap": {},
"InputTemplate": json.dumps(
{
"archive-arn": "{0}:{1}".format(
archive.arn, archive.uuid
),
"archive-arn": f"{archive.arn}:{archive.uuid}",
"event": "<aws.events.event.json>",
"ingestion-time": "<aws.events.event.ingestion-time>",
}
@ -1509,7 +1490,7 @@ class EventsBackend(BaseBackend):
archive = self.archives.get(name)
if not archive:
raise ResourceNotFoundException("Archive {} does not exist.".format(name))
raise ResourceNotFoundException(f"Archive {name} does not exist.")
return archive.describe()
@ -1521,11 +1502,11 @@ class EventsBackend(BaseBackend):
)
if state and state not in Archive.VALID_STATES:
valid_states = ", ".join(Archive.VALID_STATES)
raise ValidationException(
"1 validation error detected: "
"Value '{0}' at 'state' failed to satisfy constraint: "
"Member must satisfy enum value set: "
"[{1}]".format(state, ", ".join(Archive.VALID_STATES))
f"Value '{state}' at 'state' failed to satisfy constraint: "
f"Member must satisfy enum value set: [{valid_states}]"
)
if [name_prefix, source_arn, state].count(None) == 3:
@ -1547,7 +1528,7 @@ class EventsBackend(BaseBackend):
archive = self.archives.get(name)
if not archive:
raise ResourceNotFoundException("Archive {} does not exist.".format(name))
raise ResourceNotFoundException(f"Archive {name} does not exist.")
archive.update(description, event_pattern, retention)
@ -1561,7 +1542,7 @@ class EventsBackend(BaseBackend):
archive = self.archives.get(name)
if not archive:
raise ResourceNotFoundException("Archive {} does not exist.".format(name))
raise ResourceNotFoundException(f"Archive {name} does not exist.")
archive.delete(self.account_id, self.region_name)
@ -1572,8 +1553,7 @@ class EventsBackend(BaseBackend):
event_bus_arn_pattern = r"^arn:aws:events:[a-zA-Z0-9-]+:\d{12}:event-bus/"
if not re.match(event_bus_arn_pattern, event_bus_arn):
raise ValidationException(
"Parameter Destination.Arn is not valid. "
"Reason: Must contain an event bus ARN."
"Parameter Destination.Arn is not valid. Reason: Must contain an event bus ARN."
)
self._get_event_bus(event_bus_arn)
@ -1582,8 +1562,7 @@ class EventsBackend(BaseBackend):
archive = self.archives.get(archive_name)
if not archive:
raise ValidationException(
"Parameter EventSourceArn is not valid. "
"Reason: Archive {} does not exist.".format(archive_name)
f"Parameter EventSourceArn is not valid. Reason: Archive {archive_name} does not exist."
)
if event_bus_arn != archive.source_arn:
@ -1599,9 +1578,7 @@ class EventsBackend(BaseBackend):
)
if name in self.replays:
raise ResourceAlreadyExistsException(
"Replay {} already exists.".format(name)
)
raise ResourceAlreadyExistsException(f"Replay {name} already exists.")
replay = Replay(
self.account_id,
@ -1638,11 +1615,9 @@ class EventsBackend(BaseBackend):
valid_states = sorted([item.value for item in ReplayState])
if state and state not in valid_states:
all_states = ", ".join(valid_states)
raise ValidationException(
"1 validation error detected: "
"Value '{0}' at 'state' failed to satisfy constraint: "
"Member must satisfy enum value set: "
"[{1}]".format(state, ", ".join(valid_states))
f"1 validation error detected: Value '{state}' at 'state' failed to satisfy constraint: Member must satisfy enum value set: [{all_states}]"
)
if [name_prefix, source_arn, state].count(None) == 3:
@ -1672,7 +1647,7 @@ class EventsBackend(BaseBackend):
ReplayState.COMPLETED,
]:
raise IllegalStatusException(
"Replay {} is not in a valid state for this operation.".format(name)
f"Replay {name} is not in a valid state for this operation."
)
replay.state = ReplayState.CANCELLED
@ -1694,9 +1669,7 @@ class EventsBackend(BaseBackend):
def update_connection(self, *, name, **kwargs):
connection = self.connections.get(name)
if not connection:
raise ResourceNotFoundException(
"Connection '{}' does not exist.".format(name)
)
raise ResourceNotFoundException(f"Connection '{name}' does not exist.")
for attr, value in kwargs.items():
if value is not None and hasattr(connection, attr):
@ -1724,9 +1697,7 @@ class EventsBackend(BaseBackend):
"""
connection = self.connections.get(name)
if not connection:
raise ResourceNotFoundException(
"Connection '{}' does not exist.".format(name)
)
raise ResourceNotFoundException(f"Connection '{name}' does not exist.")
return connection.describe()
@ -1748,9 +1719,7 @@ class EventsBackend(BaseBackend):
"""
connection = self.connections.pop(name, None)
if not connection:
raise ResourceNotFoundException(
"Connection '{}' does not exist.".format(name)
)
raise ResourceNotFoundException(f"Connection '{name}' does not exist.")
return connection.describe_short()
@ -1804,7 +1773,7 @@ class EventsBackend(BaseBackend):
destination = self.destinations.get(name)
if not destination:
raise ResourceNotFoundException(
"An api-destination '{}' does not exist.".format(name)
f"An api-destination '{name}' does not exist."
)
return destination.describe()
@ -1821,7 +1790,7 @@ class EventsBackend(BaseBackend):
destination = self.destinations.get(name)
if not destination:
raise ResourceNotFoundException(
"An api-destination '{}' does not exist.".format(name)
f"An api-destination '{name}' does not exist."
)
for attr, value in kwargs.items():
@ -1849,7 +1818,7 @@ class EventsBackend(BaseBackend):
destination = self.destinations.pop(name, None)
if not destination:
raise ResourceNotFoundException(
"An api-destination '{}' does not exist.".format(name)
f"An api-destination '{name}' does not exist."
)
return {}