Techdebt: Replace string-format with f-strings (for r* dirs) (#5691)

This commit is contained in:
Bert Blommers 2022-11-20 12:23:43 -01:00 committed by GitHub
parent 5274da5431
commit 2093a99485
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 108 additions and 224 deletions

View File

@ -69,7 +69,7 @@ class ResourceShare(BaseModel):
continue
else:
raise UnknownResourceException(
"Organization {} could not be found.".format(match.group(1))
f"Organization {match.group(1)} could not be found."
)
match = re.search(
@ -97,15 +97,12 @@ class ResourceShare(BaseModel):
continue
raise UnknownResourceException(
"OrganizationalUnit {} in unknown organization could not be found.".format(
match.group(2)
)
f"OrganizationalUnit {match.group(2)} in unknown organization could not be found."
)
if not re.match(r"^\d{12}$", principal):
raise InvalidParameterException(
"Principal ID {} is malformed. "
"Verify the ID and try again.".format(principal)
f"Principal ID {principal} is malformed. Verify the ID and try again."
)
for principal in principals:
@ -118,8 +115,7 @@ class ResourceShare(BaseModel):
)
if not match:
raise MalformedArnException(
"The specified resource ARN {} is not valid. "
"Verify the ARN and try again.".format(resource)
f"The specified resource ARN {resource} is not valid. Verify the ARN and try again."
)
if match.group(1) not in self.SHAREABLE_RESOURCES:
@ -180,8 +176,8 @@ class ResourceAccessManagerBackend(BaseBackend):
if owner not in ["SELF", "OTHER-ACCOUNTS"]:
raise InvalidParameterException(
"{} is not a valid resource owner. "
"Specify either SELF or OTHER-ACCOUNTS and try again.".format(owner)
f"{owner} is not a valid resource owner. "
"Specify either SELF or OTHER-ACCOUNTS and try again."
)
if owner == "OTHER-ACCOUNTS":
@ -201,9 +197,7 @@ class ResourceAccessManagerBackend(BaseBackend):
)
if not resource:
raise UnknownResourceException(
"ResourceShare {} could not be found.".format(arn)
)
raise UnknownResourceException(f"ResourceShare {arn} could not be found.")
resource.update(**kwargs)
response = resource.describe()
@ -217,9 +211,7 @@ class ResourceAccessManagerBackend(BaseBackend):
)
if not resource:
raise UnknownResourceException(
"ResourceShare {} could not be found.".format(arn)
)
raise UnknownResourceException(f"ResourceShare {arn} could not be found.")
resource.delete()

View File

@ -22,8 +22,7 @@ class RDSClientError(RESTError):
class DBInstanceNotFoundError(RDSClientError):
def __init__(self, database_identifier):
super().__init__(
"DBInstanceNotFound",
"DBInstance {0} not found.".format(database_identifier),
"DBInstanceNotFound", f"DBInstance {database_identifier} not found."
)
@ -69,9 +68,7 @@ class InvalidDBClusterStateFaultError(RDSClientError):
def __init__(self, database_identifier):
super().__init__(
"InvalidDBClusterStateFault",
"Invalid DB type, when trying to perform StopDBInstance on {0}e. See AWS RDS documentation on rds.stop_db_instance".format(
database_identifier
),
f"Invalid DB type, when trying to perform StopDBInstance on {database_identifier}e. See AWS RDS documentation on rds.stop_db_instance",
)
@ -83,8 +80,7 @@ class InvalidDBInstanceStateError(RDSClientError):
else "stopped, it cannot be started"
)
super().__init__(
"InvalidDBInstanceState",
"Instance {} is not {}.".format(database_identifier, estate),
"InvalidDBInstanceState", f"Instance {database_identifier} is not {estate}."
)
@ -100,9 +96,7 @@ class DBSnapshotAlreadyExistsError(RDSClientError):
def __init__(self, database_snapshot_identifier):
super().__init__(
"DBSnapshotAlreadyExists",
"Cannot create the snapshot because a snapshot with the identifier {} already exists.".format(
database_snapshot_identifier
),
f"Cannot create the snapshot because a snapshot with the identifier {database_snapshot_identifier} already exists.",
)
@ -124,8 +118,7 @@ class InvalidDBClusterStateFault(RDSClientError):
class DBClusterNotFoundError(RDSClientError):
def __init__(self, cluster_identifier):
super().__init__(
"DBClusterNotFoundFault",
"DBCluster {} not found.".format(cluster_identifier),
"DBClusterNotFoundFault", f"DBCluster {cluster_identifier} not found."
)
@ -133,7 +126,7 @@ class DBClusterSnapshotNotFoundError(RDSClientError):
def __init__(self, snapshot_identifier):
super().__init__(
"DBClusterSnapshotNotFoundFault",
"DBClusterSnapshot {} not found.".format(snapshot_identifier),
f"DBClusterSnapshot {snapshot_identifier} not found.",
)
@ -141,9 +134,7 @@ class DBClusterSnapshotAlreadyExistsError(RDSClientError):
def __init__(self, database_snapshot_identifier):
super().__init__(
"DBClusterSnapshotAlreadyExistsFault",
"Cannot create the snapshot because a snapshot with the identifier {} already exists.".format(
database_snapshot_identifier
),
f"Cannot create the snapshot because a snapshot with the identifier {database_snapshot_identifier} already exists.",
)
@ -151,9 +142,7 @@ class ExportTaskAlreadyExistsError(RDSClientError):
def __init__(self, export_task_identifier):
super().__init__(
"ExportTaskAlreadyExistsFault",
"Cannot start export task because a task with the identifier {} already exists.".format(
export_task_identifier
),
f"Cannot start export task because a task with the identifier {export_task_identifier} already exists.",
)
@ -161,9 +150,7 @@ class ExportTaskNotFoundError(RDSClientError):
def __init__(self, export_task_identifier):
super().__init__(
"ExportTaskNotFoundFault",
"Cannot cancel export task because a task with the identifier {} is not exist.".format(
export_task_identifier
),
f"Cannot cancel export task because a task with the identifier {export_task_identifier} is not exist.",
)
@ -171,9 +158,7 @@ class InvalidExportSourceStateError(RDSClientError):
def __init__(self, status):
super().__init__(
"InvalidExportSourceStateFault",
"Export source should be 'available' but current status is {}.".format(
status
),
f"Export source should be 'available' but current status is {status}.",
)
@ -181,13 +166,12 @@ class SubscriptionAlreadyExistError(RDSClientError):
def __init__(self, subscription_name):
super().__init__(
"SubscriptionAlreadyExistFault",
"Subscription {} already exists.".format(subscription_name),
f"Subscription {subscription_name} already exists.",
)
class SubscriptionNotFoundError(RDSClientError):
def __init__(self, subscription_name):
super().__init__(
"SubscriptionNotFoundFault",
"Subscription {} not found.".format(subscription_name),
"SubscriptionNotFoundFault", f"Subscription {subscription_name} not found."
)

View File

@ -504,7 +504,7 @@ class Database(CloudFormationModel):
db_family,
db_parameter_group_name,
) = self.default_db_parameter_group_details()
description = "Default parameter group for {0}".format(db_family)
description = f"Default parameter group for {db_family}"
return [
DBParameterGroup(
account_id=self.account_id,
@ -523,16 +523,16 @@ class Database(CloudFormationModel):
return [backend.db_parameter_groups[self.db_parameter_group_name]]
def is_default_parameter_group(self, param_group_name):
return param_group_name.startswith("default.%s" % self.engine.lower())
return param_group_name.startswith(f"default.{self.engine.lower()}")
def default_db_parameter_group_details(self):
if not self.engine_version:
return (None, None)
minor_engine_version = ".".join(str(self.engine_version).rsplit(".")[:-1])
db_family = "{0}{1}".format(self.engine.lower(), minor_engine_version)
db_family = f"{self.engine.lower()}{minor_engine_version}"
return db_family, "default.{0}".format(db_family)
return db_family, f"default.{db_family}"
def to_xml(self):
template = Template(
@ -662,9 +662,7 @@ class Database(CloudFormationModel):
@property
def address(self):
return "{0}.aaaaaaaaaa.{1}.rds.amazonaws.com".format(
self.db_instance_identifier, self.region_name
)
return f"{self.db_instance_identifier}.aaaaaaaaaa.{self.region_name}.rds.amazonaws.com"
def add_replica(self, replica):
if self.region_name != replica.region_name:
@ -1599,12 +1597,10 @@ class RDSBackend(BaseBackend):
"sqlserver-ex": ["10.50", "11.00"],
"sqlserver-web": ["10.50", "11.00"],
}
if option_group_kwargs["name"] in self.option_groups:
if option_group_id in self.option_groups:
raise RDSClientError(
"OptionGroupAlreadyExistsFault",
"An option group named {0} already exists.".format(
option_group_kwargs["name"]
),
f"An option group named {option_group_id} already exists.",
)
if (
"description" not in option_group_kwargs
@ -1624,10 +1620,7 @@ class RDSBackend(BaseBackend):
):
raise RDSClientError(
"InvalidParameterCombination",
"Cannot find major version {0} for {1}".format(
option_group_kwargs["major_engine_version"],
option_group_kwargs["engine_name"],
),
f"Cannot find major version {option_group_kwargs['major_engine_version']} for {option_group_kwargs['engine_name']}",
)
option_group = OptionGroup(**option_group_kwargs)
self.option_groups[option_group_id] = option_group
@ -1710,7 +1703,7 @@ class RDSBackend(BaseBackend):
if engine_name not in default_option_group_options:
raise RDSClientError(
"InvalidParameterValue", "Invalid DB engine: {0}".format(engine_name)
"InvalidParameterValue", f"Invalid DB engine: {engine_name}"
)
if (
major_engine_version
@ -1718,9 +1711,7 @@ class RDSBackend(BaseBackend):
):
raise RDSClientError(
"InvalidParameterCombination",
"Cannot find major version {0} for {1}".format(
major_engine_version, engine_name
),
f"Cannot find major version {major_engine_version} for {engine_name}",
)
if major_engine_version:
return default_option_group_options[engine_name][major_engine_version]
@ -1744,12 +1735,10 @@ class RDSBackend(BaseBackend):
def create_db_parameter_group(self, db_parameter_group_kwargs):
db_parameter_group_id = db_parameter_group_kwargs["name"]
if db_parameter_group_kwargs["name"] in self.db_parameter_groups:
if db_parameter_group_id in self.db_parameter_groups:
raise RDSClientError(
"DBParameterGroupAlreadyExistsFault",
"A DB parameter group named {0} already exists.".format(
db_parameter_group_kwargs["name"]
),
f"A DB parameter group named {db_parameter_group_id} already exists.",
)
if not db_parameter_group_kwargs.get("description"):
raise RDSClientError(
@ -2062,7 +2051,7 @@ class RDSBackend(BaseBackend):
return self.subnet_groups[resource_name].get_tags()
else:
raise RDSClientError(
"InvalidParameterValue", "Invalid resource name: {0}".format(arn)
"InvalidParameterValue", f"Invalid resource name: {arn}"
)
return []
@ -2102,7 +2091,7 @@ class RDSBackend(BaseBackend):
return self.subnet_groups[resource_name].remove_tags(tag_keys)
else:
raise RDSClientError(
"InvalidParameterValue", "Invalid resource name: {0}".format(arn)
"InvalidParameterValue", f"Invalid resource name: {arn}"
)
def add_tags_to_resource(self, arn, tags):
@ -2141,7 +2130,7 @@ class RDSBackend(BaseBackend):
return self.subnet_groups[resource_name].add_tags(tags)
else:
raise RDSClientError(
"InvalidParameterValue", "Invalid resource name: {0}".format(arn)
"InvalidParameterValue", f"Invalid resource name: {arn}"
)
@staticmethod

View File

@ -60,7 +60,7 @@ class RDSResponse(BaseResponse):
"tags": list(),
"deletion_protection": self._get_bool_param("DeletionProtection"),
}
args["tags"] = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
args["tags"] = self.unpack_list_params("Tags", "Tag")
return args
def _get_modify_db_cluster_kwargs(self):
@ -110,7 +110,7 @@ class RDSResponse(BaseResponse):
"tags": list(),
"deletion_protection": self._get_bool_param("DeletionProtection"),
}
args["tags"] = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
args["tags"] = self.unpack_list_params("Tags", "Tag")
return args
def _get_db_replica_kwargs(self):
@ -141,7 +141,7 @@ class RDSResponse(BaseResponse):
"description": self._get_param("Description"),
"family": self._get_param("DBParameterGroupFamily"),
"name": self._get_param("DBParameterGroupName"),
"tags": self.unpack_complex_list_params("Tags.Tag", ("Key", "Value")),
"tags": self.unpack_list_params("Tags", "Tag"),
}
def _get_db_cluster_kwargs(self):
@ -169,7 +169,7 @@ class RDSResponse(BaseResponse):
"db_cluster_instance_class": self._get_param("DBClusterInstanceClass"),
"enable_http_endpoint": self._get_param("EnableHttpEndpoint"),
"copy_tags_to_snapshot": self._get_param("CopyTagsToSnapshot"),
"tags": self.unpack_complex_list_params("Tags.Tag", ("Key", "Value")),
"tags": self.unpack_list_params("Tags", "Tag"),
}
def _get_export_task_kwargs(self):
@ -180,7 +180,7 @@ class RDSResponse(BaseResponse):
"iam_role_arn": self._get_param("IamRoleArn"),
"kms_key_id": self._get_param("KmsKeyId"),
"s3_prefix": self._get_param("S3Prefix"),
"export_only": self.unpack_list_params("ExportOnly.member"),
"export_only": self.unpack_list_params("ExportOnly", "member"),
}
def _get_event_subscription_kwargs(self):
@ -189,33 +189,16 @@ class RDSResponse(BaseResponse):
"sns_topic_arn": self._get_param("SnsTopicArn"),
"source_type": self._get_param("SourceType"),
"event_categories": self.unpack_list_params(
"EventCategories.EventCategory"
"EventCategories", "EventCategory"
),
"source_ids": self.unpack_list_params("SourceIds.SourceId"),
"source_ids": self.unpack_list_params("SourceIds", "SourceId"),
"enabled": self._get_param("Enabled"),
"tags": self.unpack_complex_list_params("Tags.Tag", ("Key", "Value")),
"tags": self.unpack_list_params("Tags", "Tag"),
}
def unpack_complex_list_params(self, label, names):
unpacked_list = list()
count = 1
while self._get_param("{0}.{1}.{2}".format(label, count, names[0])):
param = dict()
for i in range(len(names)):
param[names[i]] = self._get_param(
"{0}.{1}.{2}".format(label, count, names[i])
)
unpacked_list.append(param)
count += 1
return unpacked_list
def unpack_list_params(self, label):
unpacked_list = list()
count = 1
while self._get_param("{0}.{1}".format(label, count)):
unpacked_list.append(self._get_param("{0}.{1}".format(label, count)))
count += 1
return unpacked_list
def unpack_list_params(self, label, child_label):
root = self._get_multi_param_dict(label) or {}
return root.get(child_label, [])
def create_db_instance(self):
db_kwargs = self._get_db_kwargs()
@ -284,7 +267,7 @@ class RDSResponse(BaseResponse):
def create_db_snapshot(self):
db_instance_identifier = self._get_param("DBInstanceIdentifier")
db_snapshot_identifier = self._get_param("DBSnapshotIdentifier")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
snapshot = self.backend.create_db_snapshot(
db_instance_identifier, db_snapshot_identifier, tags
)
@ -294,7 +277,7 @@ class RDSResponse(BaseResponse):
def copy_db_snapshot(self):
source_snapshot_identifier = self._get_param("SourceDBSnapshotIdentifier")
target_snapshot_identifier = self._get_param("TargetDBSnapshotIdentifier")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
snapshot = self.backend.copy_database_snapshot(
source_snapshot_identifier, target_snapshot_identifier, tags
)
@ -335,14 +318,14 @@ class RDSResponse(BaseResponse):
def add_tags_to_resource(self):
arn = self._get_param("ResourceName")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
tags = self.backend.add_tags_to_resource(arn, tags)
template = self.response_template(ADD_TAGS_TO_RESOURCE_TEMPLATE)
return template.render(tags=tags)
def remove_tags_from_resource(self):
arn = self._get_param("ResourceName")
tag_keys = self.unpack_list_params("TagKeys.member")
tag_keys = self.unpack_list_params("TagKeys", "member")
self.backend.remove_tags_from_resource(arn, tag_keys)
template = self.response_template(REMOVE_TAGS_FROM_RESOURCE_TEMPLATE)
return template.render()
@ -365,7 +348,7 @@ class RDSResponse(BaseResponse):
def create_db_security_group(self):
group_name = self._get_param("DBSecurityGroupName")
description = self._get_param("DBSecurityGroupDescription")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
security_group = self.backend.create_db_security_group(
group_name, description, tags
)
@ -397,7 +380,7 @@ class RDSResponse(BaseResponse):
subnet_name = self._get_param("DBSubnetGroupName")
description = self._get_param("DBSubnetGroupDescription")
subnet_ids = self._get_multi_param("SubnetIds.SubnetIdentifier")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
subnets = [
ec2_backends[self.current_account][self.region].get_subnet(subnet_id)
for subnet_id in subnet_ids
@ -466,27 +449,22 @@ class RDSResponse(BaseResponse):
option_group_name = self._get_param("OptionGroupName")
count = 1
options_to_include = []
while self._get_param("OptionsToInclude.member.{0}.OptionName".format(count)):
# TODO: This can probably be refactored with a single call to super.get_multi_param, but there are not enough tests (yet) to verify this
while self._get_param(f"OptionsToInclude.member.{count}.OptionName"):
options_to_include.append(
{
"Port": self._get_param(
"OptionsToInclude.member.{0}.Port".format(count)
),
"Port": self._get_param(f"OptionsToInclude.member.{count}.Port"),
"OptionName": self._get_param(
"OptionsToInclude.member.{0}.OptionName".format(count)
f"OptionsToInclude.member.{count}.OptionName"
),
"DBSecurityGroupMemberships": self._get_param(
"OptionsToInclude.member.{0}.DBSecurityGroupMemberships".format(
count
)
f"OptionsToInclude.member.{count}.DBSecurityGroupMemberships"
),
"OptionSettings": self._get_param(
"OptionsToInclude.member.{0}.OptionSettings".format(count)
f"OptionsToInclude.member.{count}.OptionSettings"
),
"VpcSecurityGroupMemberships": self._get_param(
"OptionsToInclude.member.{0}.VpcSecurityGroupMemberships".format(
count
)
f"OptionsToInclude.member.{count}.VpcSecurityGroupMemberships"
),
}
)
@ -494,10 +472,8 @@ class RDSResponse(BaseResponse):
count = 1
options_to_remove = []
while self._get_param("OptionsToRemove.member.{0}".format(count)):
options_to_remove.append(
self._get_param("OptionsToRemove.member.{0}".format(count))
)
while self._get_param(f"OptionsToRemove.member.{count}"):
options_to_remove.append(self._get_param(f"OptionsToRemove.member.{count}"))
count += 1
option_group = self.backend.modify_option_group(
option_group_name, options_to_include, options_to_remove
@ -601,7 +577,7 @@ class RDSResponse(BaseResponse):
def create_db_cluster_snapshot(self):
db_cluster_identifier = self._get_param("DBClusterIdentifier")
db_snapshot_identifier = self._get_param("DBClusterSnapshotIdentifier")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
snapshot = self.backend.create_db_cluster_snapshot(
db_cluster_identifier, db_snapshot_identifier, tags
)
@ -615,7 +591,7 @@ class RDSResponse(BaseResponse):
target_snapshot_identifier = self._get_param(
"TargetDBClusterSnapshotIdentifier"
)
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
snapshot = self.backend.copy_cluster_snapshot(
source_snapshot_identifier, target_snapshot_identifier, tags
)

View File

@ -94,14 +94,12 @@ def validate_filters(filters, filter_defs):
for filter_name, filter_values in filters.items():
filter_def = filter_defs.get(filter_name)
if filter_def is None:
raise KeyError("Unrecognized filter name: {}".format(filter_name))
raise KeyError(f"Unrecognized filter name: {filter_name}")
if not filter_values:
raise ValueError(
"The list of {} must not be empty.".format(filter_def.description)
)
raise ValueError(f"The list of {filter_def.description} must not be empty.")
if filter_def.attrs_to_check is None:
raise NotImplementedError(
"{} filter has not been implemented in Moto yet.".format(filter_name)
f"{filter_name} filter has not been implemented in Moto yet."
)

View File

@ -15,16 +15,13 @@ class RedshiftClientError(JsonRESTError):
class ClusterNotFoundError(RedshiftClientError):
def __init__(self, cluster_identifier):
super().__init__(
"ClusterNotFound", "Cluster {0} not found.".format(cluster_identifier)
)
super().__init__("ClusterNotFound", f"Cluster {cluster_identifier} not found.")
class ClusterSubnetGroupNotFoundError(RedshiftClientError):
def __init__(self, subnet_identifier):
super().__init__(
"ClusterSubnetGroupNotFound",
"Subnet group {0} not found.".format(subnet_identifier),
"ClusterSubnetGroupNotFound", f"Subnet group {subnet_identifier} not found."
)
@ -32,7 +29,7 @@ class ClusterSecurityGroupNotFoundError(RedshiftClientError):
def __init__(self, group_identifier):
super().__init__(
"ClusterSecurityGroupNotFound",
"Security group {0} not found.".format(group_identifier),
f"Security group {group_identifier} not found.",
)
@ -40,15 +37,13 @@ class ClusterParameterGroupNotFoundError(RedshiftClientError):
def __init__(self, group_identifier):
super().__init__(
"ClusterParameterGroupNotFound",
"Parameter group {0} not found.".format(group_identifier),
f"Parameter group {group_identifier} not found.",
)
class InvalidSubnetError(RedshiftClientError):
def __init__(self, subnet_identifier):
super().__init__(
"InvalidSubnet", "Subnet {0} not found.".format(subnet_identifier)
)
super().__init__("InvalidSubnet", f"Subnet {subnet_identifier} not found.")
class SnapshotCopyGrantAlreadyExistsFaultError(RedshiftClientError):
@ -56,7 +51,7 @@ class SnapshotCopyGrantAlreadyExistsFaultError(RedshiftClientError):
super().__init__(
"SnapshotCopyGrantAlreadyExistsFault",
"Cannot create the snapshot copy grant because a grant "
"with the identifier '{0}' already exists".format(snapshot_copy_grant_name),
f"with the identifier '{snapshot_copy_grant_name}' already exists",
)
@ -64,15 +59,14 @@ class SnapshotCopyGrantNotFoundFaultError(RedshiftClientError):
def __init__(self, snapshot_copy_grant_name):
super().__init__(
"SnapshotCopyGrantNotFoundFault",
"Snapshot copy grant not found: {0}".format(snapshot_copy_grant_name),
f"Snapshot copy grant not found: {snapshot_copy_grant_name}",
)
class ClusterSnapshotNotFoundError(RedshiftClientError):
def __init__(self, snapshot_identifier):
super().__init__(
"ClusterSnapshotNotFound",
"Snapshot {0} not found.".format(snapshot_identifier),
"ClusterSnapshotNotFound", f"Snapshot {snapshot_identifier} not found."
)
@ -81,7 +75,7 @@ class ClusterSnapshotAlreadyExistsError(RedshiftClientError):
super().__init__(
"ClusterSnapshotAlreadyExists",
"Cannot create the snapshot because a snapshot with the "
"identifier {0} already exists".format(snapshot_identifier),
f"identifier {snapshot_identifier} already exists",
)
@ -96,9 +90,9 @@ class ResourceNotFoundFaultError(RedshiftClientError):
def __init__(self, resource_type=None, resource_name=None, message=None):
if resource_type and not resource_name:
msg = "resource of type '{0}' not found.".format(resource_type)
msg = f"resource of type '{resource_type}' not found."
else:
msg = "{0} ({1}) not found.".format(resource_type, resource_name)
msg = f"{resource_type} ({resource_name}) not found."
if message:
msg = message
super().__init__("ResourceNotFoundFault", msg)
@ -108,9 +102,7 @@ class SnapshotCopyDisabledFaultError(RedshiftClientError):
def __init__(self, cluster_identifier):
super().__init__(
"SnapshotCopyDisabledFault",
"Cannot modify retention period because snapshot copy is disabled on Cluster {0}.".format(
cluster_identifier
),
f"Cannot modify retention period because snapshot copy is disabled on Cluster {cluster_identifier}.",
)
@ -118,9 +110,7 @@ class SnapshotCopyAlreadyDisabledFaultError(RedshiftClientError):
def __init__(self, cluster_identifier):
super().__init__(
"SnapshotCopyAlreadyDisabledFault",
"Snapshot Copy is already disabled on Cluster {0}.".format(
cluster_identifier
),
f"Snapshot Copy is already disabled on Cluster {cluster_identifier}.",
)
@ -128,9 +118,7 @@ class SnapshotCopyAlreadyEnabledFaultError(RedshiftClientError):
def __init__(self, cluster_identifier):
super().__init__(
"SnapshotCopyAlreadyEnabledFault",
"Snapshot Copy is already enabled on Cluster {0}.".format(
cluster_identifier
),
f"Snapshot Copy is already enabled on Cluster {cluster_identifier}.",
)
@ -161,7 +149,5 @@ class InvalidClusterSnapshotStateFaultError(RedshiftClientError):
def __init__(self, snapshot_identifier):
super().__init__(
"InvalidClusterSnapshotStateFault",
"Cannot delete the snapshot {0} because only manual snapshots may be deleted".format(
snapshot_identifier
),
f"Cannot delete the snapshot {snapshot_identifier} because only manual snapshots may be deleted",
)

View File

@ -219,9 +219,7 @@ class Cluster(TaggableResourceMixin, CloudFormationModel):
@property
def endpoint(self):
return "{0}.cg034hpkmmjt.{1}.redshift.amazonaws.com".format(
self.cluster_identifier, self.region
)
return f"{self.cluster_identifier}.cg034hpkmmjt.{self.region}.redshift.amazonaws.com"
@property
def security_groups(self):
@ -524,10 +522,7 @@ class Snapshot(TaggableResourceMixin, BaseModel):
@property
def resource_id(self):
return "{cluster_id}/{snapshot_id}".format(
cluster_id=self.cluster.cluster_identifier,
snapshot_id=self.snapshot_identifier,
)
return f"{self.cluster.cluster_identifier}/{self.snapshot_identifier}"
def to_json(self):
return {
@ -604,7 +599,7 @@ class RedshiftBackend(BaseBackend):
)
if kwargs["destination_region"] == self.region_name:
raise UnknownSnapshotCopyRegionFaultError(
"Invalid region {}".format(self.region_name)
f"Invalid region {self.region_name}"
)
status = {
"DestinationRegion": kwargs["destination_region"],
@ -641,9 +636,7 @@ class RedshiftBackend(BaseBackend):
raise ClusterAlreadyExistsFaultError()
cluster = Cluster(self, **cluster_kwargs)
self.clusters[cluster_identifier] = cluster
snapshot_id = "rs:{}-{}".format(
cluster_identifier, datetime.datetime.utcnow().strftime("%Y-%m-%d-%H-%M")
)
snapshot_id = f"rs:{cluster_identifier}-{datetime.datetime.utcnow().strftime('%Y-%m-%d-%H-%M')}"
# Automated snapshots don't copy over the tags
self.create_cluster_snapshot(
cluster_identifier,
@ -972,9 +965,8 @@ class RedshiftBackend(BaseBackend):
resources = self.RESOURCE_TYPE_MAP.get(resource_type)
if resources is None:
message = (
"Tagging is not supported for this type of resource: '{0}' "
"(the ARN is potentially malformed, please check the ARN "
"documentation for more information)".format(resource_type)
f"Tagging is not supported for this type of resource: '{resource_type}' "
"(the ARN is potentially malformed, please check the ARN documentation for more information)"
)
raise ResourceNotFoundFaultError(message=message)
try:

View File

@ -67,26 +67,9 @@ class RedshiftResponse(BaseResponse):
body = convert_json_error_to_xml(body)
return status, headers, body
def unpack_complex_list_params(self, label, names):
unpacked_list = list()
count = 1
while self._get_param("{0}.{1}.{2}".format(label, count, names[0])):
param = dict()
for i in range(len(names)):
param[names[i]] = self._get_param(
"{0}.{1}.{2}".format(label, count, names[i])
)
unpacked_list.append(param)
count += 1
return unpacked_list
def unpack_list_params(self, label):
unpacked_list = list()
count = 1
while self._get_param("{0}.{1}".format(label, count)):
unpacked_list.append(self._get_param("{0}.{1}".format(label, count)))
count += 1
return unpacked_list
def unpack_list_params(self, label, child_label):
root = self._get_multi_param_dict(label) or {}
return root.get(child_label, [])
def _get_cluster_security_groups(self):
cluster_security_groups = self._get_multi_param("ClusterSecurityGroups.member")
@ -144,7 +127,7 @@ class RedshiftResponse(BaseResponse):
"publicly_accessible": self._get_param("PubliclyAccessible"),
"encrypted": self._get_param("Encrypted"),
"region_name": self.region,
"tags": self.unpack_complex_list_params("Tags.Tag", ("Key", "Value")),
"tags": self.unpack_list_params("Tags", "Tag"),
"iam_roles_arn": self._get_iam_roles(),
"enhanced_vpc_routing": self._get_param("EnhancedVpcRouting"),
"kms_key_id": self._get_param("KmsKeyId"),
@ -327,7 +310,7 @@ class RedshiftResponse(BaseResponse):
cluster_subnet_group_name = self._get_param("ClusterSubnetGroupName")
description = self._get_param("Description")
subnet_ids = self._get_subnet_ids()
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
subnet_group = self.redshift_backend.create_cluster_subnet_group(
cluster_subnet_group_name=cluster_subnet_group_name,
@ -388,7 +371,7 @@ class RedshiftResponse(BaseResponse):
def create_cluster_security_group(self):
cluster_security_group_name = self._get_param("ClusterSecurityGroupName")
description = self._get_param("Description")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
security_group = self.redshift_backend.create_cluster_security_group(
cluster_security_group_name=cluster_security_group_name,
@ -477,7 +460,7 @@ class RedshiftResponse(BaseResponse):
cluster_parameter_group_name = self._get_param("ParameterGroupName")
group_family = self._get_param("ParameterGroupFamily")
description = self._get_param("Description")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
parameter_group = self.redshift_backend.create_cluster_parameter_group(
cluster_parameter_group_name, group_family, description, self.region, tags
@ -537,7 +520,7 @@ class RedshiftResponse(BaseResponse):
def create_cluster_snapshot(self):
cluster_identifier = self._get_param("ClusterIdentifier")
snapshot_identifier = self._get_param("SnapshotIdentifier")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
snapshot = self.redshift_backend.create_cluster_snapshot(
cluster_identifier, snapshot_identifier, self.region, tags
@ -651,7 +634,7 @@ class RedshiftResponse(BaseResponse):
def create_tags(self):
resource_name = self._get_param("ResourceName")
tags = self.unpack_complex_list_params("Tags.Tag", ("Key", "Value"))
tags = self.unpack_list_params("Tags", "Tag")
self.redshift_backend.create_tags(resource_name, tags)
@ -685,7 +668,7 @@ class RedshiftResponse(BaseResponse):
def delete_tags(self):
resource_name = self._get_param("ResourceName")
tag_keys = self.unpack_list_params("TagKeys.TagKey")
tag_keys = self.unpack_list_params("TagKeys", "TagKey")
self.redshift_backend.delete_tags(resource_name, tag_keys)

View File

@ -34,9 +34,7 @@ class FakeResourceGroup(BaseModel):
@staticmethod
def _format_error(key, value, constraint):
return "Value '{value}' at '{key}' failed to satisfy constraint: {constraint}".format(
constraint=constraint, key=key, value=value
)
return f"Value '{value}' at '{key}' failed to satisfy constraint: {constraint}"
def _raise_errors(self):
if self.errors:
@ -44,9 +42,7 @@ class FakeResourceGroup(BaseModel):
plural = "s" if len(self.errors) > 1 else ""
errors = "; ".join(self.errors)
raise BadRequestException(
"{errors_len} validation error{plural} detected: {errors}".format(
errors_len=errors_len, plural=plural, errors=errors
)
f"{errors_len} validation error{plural} detected: {errors}"
)
def _validate_description(self, value):

View File

@ -174,9 +174,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
): # Skip if no tags, or invalid filter
continue
yield {
"ResourceARN": "arn:aws:ec2:{0}::image/{1}".format(
self.region_name, ami.id
),
"ResourceARN": f"arn:aws:ec2:{self.region_name}::image/{ami.id}",
"Tags": tags,
}
@ -195,9 +193,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
): # Skip if no tags, or invalid filter
continue
yield {
"ResourceARN": "arn:aws:ec2:{0}::instance/{1}".format(
self.region_name, instance.id
),
"ResourceARN": f"arn:aws:ec2:{self.region_name}::instance/{instance.id}",
"Tags": tags,
}
@ -215,9 +211,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
): # Skip if no tags, or invalid filter
continue
yield {
"ResourceARN": "arn:aws:ec2:{0}::network-interface/{1}".format(
self.region_name, eni.id
),
"ResourceARN": f"arn:aws:ec2:{self.region_name}::network-interface/{eni.id}",
"Tags": tags,
}
@ -238,9 +232,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
): # Skip if no tags, or invalid filter
continue
yield {
"ResourceARN": "arn:aws:ec2:{0}::security-group/{1}".format(
self.region_name, sg.id
),
"ResourceARN": f"arn:aws:ec2:{self.region_name}::security-group/{sg.id}",
"Tags": tags,
}
@ -258,9 +250,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
): # Skip if no tags, or invalid filter
continue
yield {
"ResourceARN": "arn:aws:ec2:{0}::snapshot/{1}".format(
self.region_name, snapshot.id
),
"ResourceARN": f"arn:aws:ec2:{self.region_name}::snapshot/{snapshot.id}",
"Tags": tags,
}
@ -280,9 +270,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
): # Skip if no tags, or invalid filter
continue
yield {
"ResourceARN": "arn:aws:ec2:{0}::volume/{1}".format(
self.region_name, volume.id
),
"ResourceARN": f"arn:aws:ec2:{self.region_name}::volume/{volume.id}",
"Tags": tags,
}
@ -300,7 +288,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
if not tag_filter(tags): # Skip if no tags, or invalid filter
continue
yield {"ResourceARN": "{0}".format(elb.arn), "Tags": tags}
yield {"ResourceARN": f"{elb.arn}", "Tags": tags}
# ELB Target Group, resource type elasticloadbalancing:targetgroup
if (
@ -315,7 +303,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
if not tag_filter(tags): # Skip if no tags, or invalid filter
continue
yield {"ResourceARN": "{0}".format(target_group.arn), "Tags": tags}
yield {"ResourceARN": f"{target_group.arn}", "Tags": tags}
# EMR Cluster
@ -336,7 +324,7 @@ class ResourceGroupsTaggingAPIBackend(BaseBackend):
if not tag_filter(tags): # Skip if no tags, or invalid filter
continue
yield {"ResourceARN": "{0}".format(kms_key.arn), "Tags": tags}
yield {"ResourceARN": f"{kms_key.arn}", "Tags": tags}
# RDS Instance
if (