Techdebt: Replace string-format with f-strings (for i* dirs) (#5682)

This commit is contained in:
Bert Blommers 2022-11-18 22:09:16 -01:00 committed by GitHub
parent 310ef4885a
commit 2c8e795991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 103 additions and 182 deletions

View File

@ -73,9 +73,7 @@ class IAMUserAccessKey:
@property @property
def arn(self): def arn(self):
return "arn:aws:iam::{account_id}:user/{iam_user_name}".format( return f"arn:aws:iam::{self.account_id}:user/{self._owner_user_name}"
account_id=self.account_id, iam_user_name=self._owner_user_name
)
def create_credentials(self): def create_credentials(self):
return Credentials(self._access_key_id, self._secret_access_key) return Credentials(self._access_key_id, self._secret_access_key)
@ -135,13 +133,7 @@ class AssumedRoleAccessKey(object):
@property @property
def arn(self): def arn(self):
return ( return f"arn:aws:sts::{self.account_id}:assumed-role/{self._owner_role_name}/{self._session_name}"
"arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name}".format(
account_id=self.account_id,
role_name=self._owner_role_name,
session_name=self._session_name,
)
)
def create_credentials(self): def create_credentials(self):
return Credentials( return Credentials(
@ -175,13 +167,7 @@ class CreateAccessKeyFailure(Exception):
class IAMRequestBase(object, metaclass=ABCMeta): class IAMRequestBase(object, metaclass=ABCMeta):
def __init__(self, account_id, method, path, data, headers): def __init__(self, account_id, method, path, data, headers):
log.debug( log.debug(
"Creating {class_name} with method={method}, path={path}, data={data}, headers={headers}".format( f"Creating {self.__class__.__name__} with method={method}, path={path}, data={data}, headers={headers}"
class_name=self.__class__.__name__,
method=method,
path=path,
data=data,
headers=headers,
)
) )
self.account_id = account_id self.account_id = account_id
self._method = method self._method = method
@ -410,7 +396,7 @@ class IAMPolicyStatement(object):
@staticmethod @staticmethod
def _match(pattern, string): def _match(pattern, string):
pattern = pattern.replace("*", ".*") pattern = pattern.replace("*", ".*")
pattern = "^{pattern}$".format(pattern=pattern) pattern = f"^{pattern}$"
return re.match(pattern, string) return re.match(pattern, string)

View File

@ -73,9 +73,7 @@ class RoleConfigQuery(ConfigQueryModel):
for role in role_list: for role in role_list:
duplicate_role_list.append( duplicate_role_list.append(
{ {
"_id": "{}{}".format( "_id": f"{role.id}{region}", # this is only for sorting, isn't returned outside of this function
role.id, region
), # this is only for sorting, isn't returned outside of this functin
"type": "AWS::IAM::Role", "type": "AWS::IAM::Role",
"id": role.id, "id": role.id,
"name": role.name, "name": role.name,
@ -238,9 +236,7 @@ class PolicyConfigQuery(ConfigQueryModel):
for policy in policy_list: for policy in policy_list:
duplicate_policy_list.append( duplicate_policy_list.append(
{ {
"_id": "{}{}".format( "_id": f"{policy.id}{region}", # this is only for sorting, isn't returned outside of this function
policy.id, region
), # this is only for sorting, isn't returned outside of this functin
"type": "AWS::IAM::Policy", "type": "AWS::IAM::Policy",
"id": policy.id, "id": policy.id,
"name": policy.name, "name": policy.name,

View File

@ -37,9 +37,7 @@ class MalformedCertificate(RESTError):
code = 400 code = 400
def __init__(self, cert): def __init__(self, cert):
super().__init__( super().__init__("MalformedCertificate", f"Certificate {cert} is malformed")
"MalformedCertificate", "Certificate {cert} is malformed".format(cert=cert)
)
class MalformedPolicyDocument(RESTError): class MalformedPolicyDocument(RESTError):
@ -70,10 +68,8 @@ class TagKeyTooBig(RESTError):
def __init__(self, tag, param="tags.X.member.key"): def __init__(self, tag, param="tags.X.member.key"):
super().__init__( super().__init__(
"ValidationError", "ValidationError",
"1 validation error detected: Value '{}' at '{}' failed to satisfy " f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy "
"constraint: Member must have length less than or equal to 128.".format( "constraint: Member must have length less than or equal to 128.",
tag, param
),
) )
@ -83,10 +79,8 @@ class TagValueTooBig(RESTError):
def __init__(self, tag): def __init__(self, tag):
super().__init__( super().__init__(
"ValidationError", "ValidationError",
"1 validation error detected: Value '{}' at 'tags.X.member.value' failed to satisfy " f"1 validation error detected: Value '{tag}' at 'tags.X.member.value' failed to satisfy "
"constraint: Member must have length less than or equal to 256.".format( "constraint: Member must have length less than or equal to 256.",
tag
),
) )
@ -105,10 +99,8 @@ class TooManyTags(RESTError):
def __init__(self, tags, param="tags"): def __init__(self, tags, param="tags"):
super().__init__( super().__init__(
"ValidationError", "ValidationError",
"1 validation error detected: Value '{}' at '{}' failed to satisfy " f"1 validation error detected: Value '{tags}' at '{param}' failed to satisfy "
"constraint: Member must have length less than or equal to 50.".format( "constraint: Member must have length less than or equal to 50.",
tags, param
),
) )

View File

@ -272,9 +272,7 @@ class OpenIDConnectProvider(BaseModel):
) )
def _format_error(self, key, value, constraint): def _format_error(self, key, value, constraint):
return 'Value "{value}" at "{key}" failed to satisfy constraint: {constraint}'.format( return f'Value "{value}" at "{key}" failed to satisfy constraint: {constraint}'
constraint=constraint, key=key, value=value
)
def _raise_errors(self): def _raise_errors(self):
if self._errors: if self._errors:
@ -284,9 +282,7 @@ class OpenIDConnectProvider(BaseModel):
self._errors = [] # reset collected errors self._errors = [] # reset collected errors
raise ValidationError( raise ValidationError(
"{count} validation error{plural} detected: {errors}".format( f"{count} validation error{plural} detected: {errors}"
count=count, plural=plural, errors=errors
)
) )
def get_tags(self): def get_tags(self):
@ -328,9 +324,7 @@ class ManagedPolicy(Policy, CloudFormationModel):
@property @property
def arn(self): def arn(self):
return "arn:aws:iam::{0}:policy{1}{2}".format( return f"arn:aws:iam::{self.account_id}:policy{self.path}{self.name}"
self.account_id, self.path, self.name
)
def to_config_dict(self): def to_config_dict(self):
return { return {
@ -338,7 +332,7 @@ class ManagedPolicy(Policy, CloudFormationModel):
"configurationItemCaptureTime": str(self.create_date), "configurationItemCaptureTime": str(self.create_date),
"configurationItemStatus": "OK", "configurationItemStatus": "OK",
"configurationStateId": str(int(unix_time())), "configurationStateId": str(int(unix_time())),
"arn": "arn:aws:iam::{}:policy/{}".format(self.account_id, self.name), "arn": f"arn:aws:iam::{self.account_id}:policy/{self.name}",
"resourceType": "AWS::IAM::Policy", "resourceType": "AWS::IAM::Policy",
"resourceId": self.id, "resourceId": self.id,
"resourceName": self.name, "resourceName": self.name,
@ -349,7 +343,7 @@ class ManagedPolicy(Policy, CloudFormationModel):
"configuration": { "configuration": {
"policyName": self.name, "policyName": self.name,
"policyId": self.id, "policyId": self.id,
"arn": "arn:aws:iam::{}:policy/{}".format(self.account_id, self.name), "arn": f"arn:aws:iam::{self.account_id}:policy/{self.name}",
"path": self.path, "path": self.path,
"defaultVersionId": self.default_version_id, "defaultVersionId": self.default_version_id,
"attachmentCount": self.attachment_count, "attachmentCount": self.attachment_count,
@ -448,7 +442,7 @@ class AWSManagedPolicy(ManagedPolicy):
@property @property
def arn(self): def arn(self):
return "arn:aws:iam::aws:policy{0}{1}".format(self.path, self.name) return f"arn:aws:iam::aws:policy{self.path}{self.name}"
class InlinePolicy(CloudFormationModel): class InlinePolicy(CloudFormationModel):
@ -772,7 +766,7 @@ class Role(CloudFormationModel):
del self.policies[policy_name] del self.policies[policy_name]
except KeyError: except KeyError:
raise IAMNotFoundException( raise IAMNotFoundException(
"The role policy with name {0} cannot be found.".format(policy_name) f"The role policy with name {policy_name} cannot be found."
) )
@property @property
@ -1141,7 +1135,7 @@ class Group(BaseModel):
try: try:
policy_json = self.policies[policy_name] policy_json = self.policies[policy_name]
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} not found".format(policy_name)) raise IAMNotFoundException(f"Policy {policy_name} not found")
return { return {
"policy_name": policy_name, "policy_name": policy_name,
@ -1157,7 +1151,7 @@ class Group(BaseModel):
def delete_policy(self, policy_name): def delete_policy(self, policy_name):
if policy_name not in self.policies: if policy_name not in self.policies:
raise IAMNotFoundException("Policy {0} not found".format(policy_name)) raise IAMNotFoundException(f"Policy {policy_name} not found")
del self.policies[policy_name] del self.policies[policy_name]
@ -1192,7 +1186,7 @@ class User(CloudFormationModel):
try: try:
policy_json = self.policies[policy_name] policy_json = self.policies[policy_name]
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} not found".format(policy_name)) raise IAMNotFoundException(f"Policy {policy_name} not found")
return { return {
"policy_name": policy_name, "policy_name": policy_name,
@ -1208,7 +1202,7 @@ class User(CloudFormationModel):
def delete_policy(self, policy_name): def delete_policy(self, policy_name):
if policy_name not in self.policies: if policy_name not in self.policies:
raise IAMNotFoundException("Policy {0} not found".format(policy_name)) raise IAMNotFoundException(f"Policy {policy_name} not found")
del self.policies[policy_name] del self.policies[policy_name]
@ -1352,21 +1346,31 @@ class User(CloudFormationModel):
else self.access_keys[1].last_used.strftime(date_format) else self.access_keys[1].last_used.strftime(date_format)
) )
return "{0},{1},{2},{3},{4},{5},not_supported,{6},{7},{8},{9},not_supported,not_supported,{10},{11},{12},not_supported,not_supported,false,N/A,false,N/A\n".format( fields = [
self.name, self.name,
self.arn, self.arn,
date_created.strftime(date_format), date_created.strftime(date_format),
password_enabled, password_enabled,
password_last_used, password_last_used,
date_created.strftime(date_format), date_created.strftime(date_format),
"not_supported",
"true" if len(self.mfa_devices) else "false", "true" if len(self.mfa_devices) else "false",
access_key_1_active, access_key_1_active,
access_key_1_last_rotated, access_key_1_last_rotated,
access_key_1_last_used, access_key_1_last_used,
"not_supported",
"not_supported",
access_key_2_active, access_key_2_active,
access_key_2_last_rotated, access_key_2_last_rotated,
access_key_2_last_used, access_key_2_last_used,
) "not_supported",
"not_supported",
"false",
"N/A",
"false",
"N/A",
]
return ",".join(fields) + "\n"
@staticmethod @staticmethod
def cloudformation_name_type(): def cloudformation_name_type():
@ -1503,9 +1507,7 @@ class AccountPasswordPolicy(BaseModel):
self._raise_errors() self._raise_errors()
def _format_error(self, key, value, constraint): def _format_error(self, key, value, constraint):
return 'Value "{value}" at "{key}" failed to satisfy constraint: {constraint}'.format( return f'Value "{value}" at "{key}" failed to satisfy constraint: {constraint}'
constraint=constraint, key=key, value=value
)
def _raise_errors(self): def _raise_errors(self):
if self._errors: if self._errors:
@ -1515,9 +1517,7 @@ class AccountPasswordPolicy(BaseModel):
self._errors = [] # reset collected errors self._errors = [] # reset collected errors
raise ValidationError( raise ValidationError(
"{count} validation error{plural} detected: {errors}".format( f"{count} validation error{plural} detected: {errors}"
count=count, plural=plural, errors=errors
)
) )
@ -1719,9 +1719,7 @@ class IAMBackend(BaseBackend):
): ):
raise RESTError( raise RESTError(
"InvalidParameterValue", "InvalidParameterValue",
"Value ({}) for parameter PermissionsBoundary is invalid.".format( f"Value ({permissions_boundary}) for parameter PermissionsBoundary is invalid.",
permissions_boundary
),
) )
role = self.get_role(role_name) role = self.get_role(role_name)
role.permissions_boundary = permissions_boundary role.permissions_boundary = permissions_boundary
@ -1737,7 +1735,7 @@ class IAMBackend(BaseBackend):
if policy.arn not in self.get_role(role_name).managed_policies.keys(): if policy.arn not in self.get_role(role_name).managed_policies.keys():
raise KeyError raise KeyError
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn)) raise IAMNotFoundException(f"Policy {policy_arn} was not found.")
policy.detach_from(self.get_role(role_name)) policy.detach_from(self.get_role(role_name))
def attach_group_policy(self, policy_arn, group_name): def attach_group_policy(self, policy_arn, group_name):
@ -1745,7 +1743,7 @@ class IAMBackend(BaseBackend):
try: try:
policy = arns[policy_arn] policy = arns[policy_arn]
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn)) raise IAMNotFoundException(f"Policy {policy_arn} was not found.")
if policy.arn in self.get_group(group_name).managed_policies.keys(): if policy.arn in self.get_group(group_name).managed_policies.keys():
return return
policy.attach_to(self.get_group(group_name)) policy.attach_to(self.get_group(group_name))
@ -1757,7 +1755,7 @@ class IAMBackend(BaseBackend):
if policy.arn not in self.get_group(group_name).managed_policies.keys(): if policy.arn not in self.get_group(group_name).managed_policies.keys():
raise KeyError raise KeyError
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn)) raise IAMNotFoundException(f"Policy {policy_arn} was not found.")
policy.detach_from(self.get_group(group_name)) policy.detach_from(self.get_group(group_name))
def attach_user_policy(self, policy_arn, user_name): def attach_user_policy(self, policy_arn, user_name):
@ -1765,7 +1763,7 @@ class IAMBackend(BaseBackend):
try: try:
policy = arns[policy_arn] policy = arns[policy_arn]
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn)) raise IAMNotFoundException(f"Policy {policy_arn} was not found.")
policy.attach_to(self.get_user(user_name)) policy.attach_to(self.get_user(user_name))
def detach_user_policy(self, policy_arn, user_name): def detach_user_policy(self, policy_arn, user_name):
@ -1775,7 +1773,7 @@ class IAMBackend(BaseBackend):
if policy.arn not in self.get_user(user_name).managed_policies.keys(): if policy.arn not in self.get_user(user_name).managed_policies.keys():
raise KeyError raise KeyError
except KeyError: except KeyError:
raise IAMNotFoundException("Policy {0} was not found.".format(policy_arn)) raise IAMNotFoundException(f"Policy {policy_arn} was not found.")
policy.detach_from(self.get_user(user_name)) policy.detach_from(self.get_user(user_name))
def create_policy(self, description, path, policy_document, policy_name, tags): def create_policy(self, description, path, policy_document, policy_name, tags):
@ -1793,16 +1791,14 @@ class IAMBackend(BaseBackend):
) )
if policy.arn in self.managed_policies: if policy.arn in self.managed_policies:
raise EntityAlreadyExists( raise EntityAlreadyExists(
"A policy called {0} already exists. Duplicate names are not allowed.".format( f"A policy called {policy_name} already exists. Duplicate names are not allowed."
policy_name
)
) )
self.managed_policies[policy.arn] = policy self.managed_policies[policy.arn] = policy
return policy return policy
def get_policy(self, policy_arn): def get_policy(self, policy_arn):
if policy_arn not in self.managed_policies: if policy_arn not in self.managed_policies:
raise IAMNotFoundException("Policy {0} not found".format(policy_arn)) raise IAMNotFoundException(f"Policy {policy_arn} not found")
return self.managed_policies.get(policy_arn) return self.managed_policies.get(policy_arn)
def list_attached_role_policies( def list_attached_role_policies(
@ -1839,9 +1835,7 @@ class IAMBackend(BaseBackend):
def set_default_policy_version(self, policy_arn, version_id): def set_default_policy_version(self, policy_arn, version_id):
if re.match(r"v[1-9][0-9]*(\.[A-Za-z0-9-]*)?", version_id) is None: if re.match(r"v[1-9][0-9]*(\.[A-Za-z0-9-]*)?", version_id) is None:
raise ValidationError( raise ValidationError(
"Value '{0}' at 'versionId' failed to satisfy constraint: Member must satisfy regular expression pattern: v[1-9][0-9]*(\\.[A-Za-z0-9-]*)?".format( f"Value '{version_id}' at 'versionId' failed to satisfy constraint: Member must satisfy regular expression pattern: v[1-9][0-9]*(\\.[A-Za-z0-9-]*)?"
version_id
)
) )
policy = self.get_policy(policy_arn) policy = self.get_policy(policy_arn)
@ -1852,9 +1846,7 @@ class IAMBackend(BaseBackend):
return True return True
raise NoSuchEntity( raise NoSuchEntity(
"Policy {0} version {1} does not exist or is not attachable.".format( f"Policy {policy_arn} version {version_id} does not exist or is not attachable."
policy_arn, version_id
)
) )
def _filter_attached_policies(self, policies, marker, max_items, path_prefix): def _filter_attached_policies(self, policies, marker, max_items, path_prefix):
@ -1890,14 +1882,10 @@ class IAMBackend(BaseBackend):
): ):
raise RESTError( raise RESTError(
"InvalidParameterValue", "InvalidParameterValue",
"Value ({}) for parameter PermissionsBoundary is invalid.".format( f"Value ({permissions_boundary}) for parameter PermissionsBoundary is invalid.",
permissions_boundary
),
) )
if [role for role in self.get_roles() if role.name == role_name]: if [role for role in self.get_roles() if role.name == role_name]:
raise EntityAlreadyExists( raise EntityAlreadyExists(f"Role with name {role_name} already exists.")
"Role with name {0} already exists.".format(role_name)
)
clean_tags = self._tag_verification(tags) clean_tags = self._tag_verification(tags)
role = Role( role = Role(
@ -1922,13 +1910,13 @@ class IAMBackend(BaseBackend):
for role in self.get_roles(): for role in self.get_roles():
if role.name == role_name: if role.name == role_name:
return role return role
raise IAMNotFoundException("Role {0} not found".format(role_name)) raise IAMNotFoundException(f"Role {role_name} not found")
def get_role_by_arn(self, arn: str) -> Role: def get_role_by_arn(self, arn: str) -> Role:
for role in self.get_roles(): for role in self.get_roles():
if role.arn == arn: if role.arn == arn:
return role return role
raise IAMNotFoundException("Role {0} not found".format(arn)) raise IAMNotFoundException(f"Role {arn} not found")
def delete_role(self, role_name): def delete_role(self, role_name):
role = self.get_role(role_name) role = self.get_role(role_name)
@ -1977,9 +1965,7 @@ class IAMBackend(BaseBackend):
if p == policy_name: if p == policy_name:
return p, d return p, d
raise IAMNotFoundException( raise IAMNotFoundException(
"Policy Document {0} not attached to role {1}".format( f"Policy Document {policy_name} not attached to role {role_name}"
policy_name, role_name
)
) )
def list_role_policies(self, role_name): def list_role_policies(self, role_name):
@ -2118,7 +2104,7 @@ class IAMBackend(BaseBackend):
set_as_default = set_as_default == "true" # convert it to python bool set_as_default = set_as_default == "true" # convert it to python bool
version = PolicyVersion(policy_arn, policy_document, set_as_default) version = PolicyVersion(policy_arn, policy_document, set_as_default)
policy.versions.append(version) policy.versions.append(version)
version.version_id = "v{0}".format(policy.next_version_num) version.version_id = f"v{policy.next_version_num}"
policy.next_version_num += 1 policy.next_version_num += 1
if set_as_default: if set_as_default:
policy.update_default_version(version.version_id) policy.update_default_version(version.version_id)
@ -2158,7 +2144,7 @@ class IAMBackend(BaseBackend):
if self.instance_profiles.get(name): if self.instance_profiles.get(name):
raise IAMConflictException( raise IAMConflictException(
code="EntityAlreadyExists", code="EntityAlreadyExists",
message="Instance Profile {0} already exists.".format(name), message=f"Instance Profile {name} already exists.",
) )
instance_profile_id = random_resource_id() instance_profile_id = random_resource_id()
@ -2184,16 +2170,14 @@ class IAMBackend(BaseBackend):
if profile.name == profile_name: if profile.name == profile_name:
return profile return profile
raise IAMNotFoundException( raise IAMNotFoundException(f"Instance profile {profile_name} not found")
"Instance profile {0} not found".format(profile_name)
)
def get_instance_profile_by_arn(self, profile_arn): def get_instance_profile_by_arn(self, profile_arn):
for profile in self.get_instance_profiles(): for profile in self.get_instance_profiles():
if profile.arn == profile_arn: if profile.arn == profile_arn:
return profile return profile
raise IAMNotFoundException("Instance profile {0} not found".format(profile_arn)) raise IAMNotFoundException(f"Instance profile {profile_arn} not found")
def get_instance_profiles(self) -> List[InstanceProfile]: def get_instance_profiles(self) -> List[InstanceProfile]:
return self.instance_profiles.values() return self.instance_profiles.values()
@ -2240,7 +2224,7 @@ class IAMBackend(BaseBackend):
return cert return cert
raise IAMNotFoundException( raise IAMNotFoundException(
"The Server Certificate with name {0} cannot be " "found.".format(name) f"The Server Certificate with name {name} cannot be found."
) )
def get_certificate_by_arn(self, arn): def get_certificate_by_arn(self, arn):
@ -2258,14 +2242,14 @@ class IAMBackend(BaseBackend):
if cert_id is None: if cert_id is None:
raise IAMNotFoundException( raise IAMNotFoundException(
"The Server Certificate with name {0} cannot be " "found.".format(name) f"The Server Certificate with name {name} cannot be found."
) )
self.certificates.pop(cert_id, None) self.certificates.pop(cert_id, None)
def create_group(self, group_name, path="/"): def create_group(self, group_name, path="/"):
if group_name in self.groups: if group_name in self.groups:
raise IAMConflictException("Group {0} already exists".format(group_name)) raise IAMConflictException(f"Group {group_name} already exists")
group = Group(self.account_id, group_name, path) group = Group(self.account_id, group_name, path)
self.groups[group_name] = group self.groups[group_name] = group
@ -2278,7 +2262,7 @@ class IAMBackend(BaseBackend):
try: try:
return self.groups[group_name] return self.groups[group_name]
except KeyError: except KeyError:
raise IAMNotFoundException("Group {0} not found".format(group_name)) raise IAMNotFoundException(f"Group {group_name} not found")
def list_groups(self): def list_groups(self):
return self.groups.values() return self.groups.values()
@ -2319,20 +2303,20 @@ class IAMBackend(BaseBackend):
del self.groups[group_name] del self.groups[group_name]
except KeyError: except KeyError:
raise IAMNotFoundException( raise IAMNotFoundException(
"The group with name {0} cannot be found.".format(group_name) f"The group with name {group_name} cannot be found."
) )
def update_group(self, group_name, new_group_name, new_path): def update_group(self, group_name, new_group_name, new_path):
if new_group_name: if new_group_name:
if new_group_name in self.groups: if new_group_name in self.groups:
raise IAMConflictException( raise IAMConflictException(
message="Group {0} already exists".format(new_group_name) message=f"Group {new_group_name} already exists"
) )
try: try:
group = self.groups[group_name] group = self.groups[group_name]
except KeyError: except KeyError:
raise IAMNotFoundException( raise IAMNotFoundException(
"The group with name {0} cannot be found.".format(group_name) f"The group with name {group_name} cannot be found."
) )
existing_policies = group.managed_policies.copy() existing_policies = group.managed_policies.copy()
@ -2348,7 +2332,7 @@ class IAMBackend(BaseBackend):
def create_user(self, user_name, path="/", tags=None): def create_user(self, user_name, path="/", tags=None):
if user_name in self.users: if user_name in self.users:
raise IAMConflictException( raise IAMConflictException(
"EntityAlreadyExists", "User {0} already exists".format(user_name) "EntityAlreadyExists", f"User {user_name} already exists"
) )
user = User(self.account_id, user_name, path) user = User(self.account_id, user_name, path)
@ -2360,7 +2344,7 @@ class IAMBackend(BaseBackend):
user = self.users.get(name) user = self.users.get(name)
if not user: if not user:
raise NoSuchEntity("The user with name {} cannot be found.".format(name)) raise NoSuchEntity(f"The user with name {name} cannot be found.")
return user return user
@ -2374,7 +2358,7 @@ class IAMBackend(BaseBackend):
except KeyError: except KeyError:
raise IAMNotFoundException( raise IAMNotFoundException(
"Users {0}, {1}, {2} not found".format(path_prefix, marker, max_items) f"Users {path_prefix}, {marker}, {max_items} not found"
) )
return users return users
@ -2383,7 +2367,7 @@ class IAMBackend(BaseBackend):
try: try:
user = self.users[user_name] user = self.users[user_name]
except KeyError: except KeyError:
raise IAMNotFoundException("User {0} not found".format(user_name)) raise IAMNotFoundException(f"User {user_name} not found")
if new_path: if new_path:
user.path = new_path user.path = new_path
@ -2438,7 +2422,7 @@ class IAMBackend(BaseBackend):
del user.signing_certificates[cert_id] del user.signing_certificates[cert_id]
except KeyError: except KeyError:
raise IAMNotFoundException( raise IAMNotFoundException(
"The Certificate with id {id} cannot be found.".format(id=cert_id) f"The Certificate with id {cert_id} cannot be found."
) )
def list_signing_certificates(self, user_name): def list_signing_certificates(self, user_name):
@ -2454,34 +2438,28 @@ class IAMBackend(BaseBackend):
except KeyError: except KeyError:
raise IAMNotFoundException( raise IAMNotFoundException(
"The Certificate with id {id} cannot be found.".format(id=cert_id) f"The Certificate with id {cert_id} cannot be found."
) )
def create_login_profile(self, user_name, password): def create_login_profile(self, user_name, password):
# This does not currently deal with PasswordPolicyViolation. # This does not currently deal with PasswordPolicyViolation.
user = self.get_user(user_name) user = self.get_user(user_name)
if user.password: if user.password:
raise IAMConflictException( raise IAMConflictException(f"User {user_name} already has password")
"User {0} already has password".format(user_name)
)
user.password = password user.password = password
return user return user
def get_login_profile(self, user_name): def get_login_profile(self, user_name):
user = self.get_user(user_name) user = self.get_user(user_name)
if not user.password: if not user.password:
raise IAMNotFoundException( raise IAMNotFoundException(f"Login profile for {user_name} not found")
"Login profile for {0} not found".format(user_name)
)
return user return user
def update_login_profile(self, user_name, password, password_reset_required): def update_login_profile(self, user_name, password, password_reset_required):
# This does not currently deal with PasswordPolicyViolation. # This does not currently deal with PasswordPolicyViolation.
user = self.get_user(user_name) user = self.get_user(user_name)
if not user.password: if not user.password:
raise IAMNotFoundException( raise IAMNotFoundException(f"Login profile for {user_name} not found")
"Login profile for {0} not found".format(user_name)
)
user.password = password user.password = password
user.password_reset_required = password_reset_required user.password_reset_required = password_reset_required
return user return user
@ -2489,9 +2467,7 @@ class IAMBackend(BaseBackend):
def delete_login_profile(self, user_name): def delete_login_profile(self, user_name):
user = self.get_user(user_name) user = self.get_user(user_name)
if not user.password: if not user.password:
raise IAMNotFoundException( raise IAMNotFoundException(f"Login profile for {user_name} not found")
"Login profile for {0} not found".format(user_name)
)
user.password = None user.password = None
def add_user_to_group(self, group_name, user_name): def add_user_to_group(self, group_name, user_name):
@ -2505,9 +2481,7 @@ class IAMBackend(BaseBackend):
try: try:
group.users.remove(user) group.users.remove(user)
except ValueError: except ValueError:
raise IAMNotFoundException( raise IAMNotFoundException(f"User {user_name} not in group {group_name}")
"User {0} not in group {1}".format(user_name, group_name)
)
def get_user_policy(self, user_name, policy_name): def get_user_policy(self, user_name, policy_name):
user = self.get_user(user_name) user = self.get_user(user_name)
@ -2625,7 +2599,7 @@ class IAMBackend(BaseBackend):
user = self.get_user(user_name) user = self.get_user(user_name)
if serial_number in user.mfa_devices: if serial_number in user.mfa_devices:
raise IAMConflictException( raise IAMConflictException(
"EntityAlreadyExists", "Device {0} already exists".format(serial_number) "EntityAlreadyExists", f"Device {serial_number} already exists"
) )
device = self.virtual_mfa_devices.get(serial_number, None) device = self.virtual_mfa_devices.get(serial_number, None)
@ -2651,7 +2625,7 @@ class IAMBackend(BaseBackend):
"""Deactivate and detach MFA Device from user if device exists.""" """Deactivate and detach MFA Device from user if device exists."""
user = self.get_user(user_name) user = self.get_user(user_name)
if serial_number not in user.mfa_devices: if serial_number not in user.mfa_devices:
raise IAMNotFoundException("Device {0} not found".format(serial_number)) raise IAMNotFoundException(f"Device {serial_number} not found")
device = self.virtual_mfa_devices.get(serial_number, None) device = self.virtual_mfa_devices.get(serial_number, None)
if device: if device:
@ -2703,9 +2677,7 @@ class IAMBackend(BaseBackend):
if not device: if not device:
raise IAMNotFoundException( raise IAMNotFoundException(
"VirtualMFADevice with serial number {0} doesn't exist.".format( f"VirtualMFADevice with serial number {serial_number} doesn't exist."
serial_number
)
) )
def list_virtual_mfa_devices(self, assignment_status, marker, max_items): def list_virtual_mfa_devices(self, assignment_status, marker, max_items):
@ -2815,9 +2787,7 @@ class IAMBackend(BaseBackend):
if saml_provider.arn == saml_provider_arn: if saml_provider.arn == saml_provider_arn:
del self.saml_providers[saml_provider.name] del self.saml_providers[saml_provider.name]
except KeyError: except KeyError:
raise IAMNotFoundException( raise IAMNotFoundException(f"SAMLProvider {saml_provider_arn} not found")
"SAMLProvider {0} not found".format(saml_provider_arn)
)
def list_saml_providers(self): def list_saml_providers(self):
return self.saml_providers.values() return self.saml_providers.values()
@ -2826,9 +2796,7 @@ class IAMBackend(BaseBackend):
for saml_provider in self.list_saml_providers(): for saml_provider in self.list_saml_providers():
if saml_provider.arn == saml_provider_arn: if saml_provider.arn == saml_provider_arn:
return saml_provider return saml_provider
raise IAMNotFoundException( raise IAMNotFoundException(f"SamlProvider {saml_provider_arn} not found")
"SamlProvider {0} not found".format(saml_provider_arn)
)
def get_user_from_access_key_id(self, access_key_id): def get_user_from_access_key_id(self, access_key_id):
for user_name, user in self.users.items(): for user_name, user in self.users.items():
@ -2894,7 +2862,7 @@ class IAMBackend(BaseBackend):
if not open_id_provider: if not open_id_provider:
raise IAMNotFoundException( raise IAMNotFoundException(
"OpenIDConnect Provider not found for arn {}".format(arn) f"OpenIDConnect Provider not found for arn {arn}"
) )
return open_id_provider return open_id_provider
@ -2956,8 +2924,7 @@ class IAMBackend(BaseBackend):
): ):
if resource_name in self.inline_policies: if resource_name in self.inline_policies:
raise IAMConflictException( raise IAMConflictException(
"EntityAlreadyExists", "EntityAlreadyExists", f"Inline Policy {resource_name} already exists"
"Inline Policy {0} already exists".format(resource_name),
) )
inline_policy = InlinePolicy( inline_policy = InlinePolicy(
@ -2976,7 +2943,7 @@ class IAMBackend(BaseBackend):
try: try:
return self.inline_policies[policy_id] return self.inline_policies[policy_id]
except KeyError: except KeyError:
raise IAMNotFoundException("Inline policy {0} not found".format(policy_id)) raise IAMNotFoundException(f"Inline policy {policy_id} not found")
def update_inline_policy( def update_inline_policy(
self, self,

View File

@ -308,9 +308,7 @@ class BaseIAMPolicyValidator:
vendor_pattern = re.compile(r"[^a-zA-Z0-9\-.]") vendor_pattern = re.compile(r"[^a-zA-Z0-9\-.]")
if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]): if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]):
raise MalformedPolicyDocument( raise MalformedPolicyDocument(f"Vendor {action_parts[0]} is not valid")
"Vendor {vendor} is not valid".format(vendor=action_parts[0])
)
def _validate_resources_for_formats(self): def _validate_resources_for_formats(self):
self._validate_resource_like_for_formats("Resource") self._validate_resource_like_for_formats("Resource")
@ -366,13 +364,8 @@ class BaseIAMPolicyValidator:
if len(remaining_resource_parts) > 3 if len(remaining_resource_parts) > 3
else "*" else "*"
) )
self._resource_error = 'Partition "{partition}" is not valid for resource "arn:{partition}:{arn1}:{arn2}:{arn3}:{arn4}".'.format( pt = resource_partitions[0]
partition=resource_partitions[0], self._resource_error = f'Partition "{pt}" is not valid for resource "arn:{pt}:{arn1}:{arn2}:{arn3}:{arn4}".'
arn1=arn1,
arn2=arn2,
arn3=arn3,
arn4=arn4,
)
return return
if resource_partitions[1] != ":": if resource_partitions[1] != ":":

View File

@ -47,6 +47,6 @@ class InstanceMetadataResponse(BaseResponse):
result = json.dumps(credentials) result = json.dumps(credentials)
else: else:
raise NotImplementedError( raise NotImplementedError(
"The {0} metadata path has not been implemented".format(path) f"The {path} metadata path has not been implemented"
) )
return 200, headers, result return 200, headers, result

View File

@ -35,14 +35,14 @@ class VersionConflictException(IoTClientError):
self.code = 409 self.code = 409
super().__init__( super().__init__(
"VersionConflictException", "VersionConflictException",
"The version for thing %s does not match the expected version." % name, f"The version for thing {name} does not match the expected version.",
) )
class CertificateStateException(IoTClientError): class CertificateStateException(IoTClientError):
def __init__(self, msg, cert_id): def __init__(self, msg, cert_id):
self.code = 406 self.code = 406
super().__init__("CertificateStateException", "%s Id: %s" % (msg, cert_id)) super().__init__("CertificateStateException", f"{msg} Id: {cert_id}")
class DeleteConflictException(IoTClientError): class DeleteConflictException(IoTClientError):
@ -71,7 +71,7 @@ class VersionsLimitExceededException(IoTClientError):
self.code = 409 self.code = 409
super().__init__( super().__init__(
"VersionsLimitExceededException", "VersionsLimitExceededException",
"The policy %s already has the maximum number of versions (5)" % name, f"The policy {name} already has the maximum number of versions (5)",
) )

View File

@ -73,7 +73,7 @@ class FakeThingType(BaseModel):
self.thing_type_id = str(random.uuid4()) # I don't know the rule of id self.thing_type_id = str(random.uuid4()) # I don't know the rule of id
t = time.time() t = time.time()
self.metadata = {"deprecated": False, "creationDate": int(t * 1000) / 1000.0} self.metadata = {"deprecated": False, "creationDate": int(t * 1000) / 1000.0}
self.arn = "arn:aws:iot:%s:1:thingtype/%s" % (self.region_name, thing_type_name) self.arn = f"arn:aws:iot:{self.region_name}:1:thingtype/{thing_type_name}"
def to_dict(self): def to_dict(self):
return { return {
@ -127,10 +127,7 @@ class FakeThingGroup(BaseModel):
} }
] ]
) )
self.arn = "arn:aws:iot:%s:1:thinggroup/%s" % ( self.arn = f"arn:aws:iot:{self.region_name}:1:thinggroup/{thing_group_name}"
self.region_name,
thing_group_name,
)
self.things = OrderedDict() self.things = OrderedDict()
def to_dict(self): def to_dict(self):
@ -306,7 +303,7 @@ class FakeJob(BaseModel):
self.region_name = region_name self.region_name = region_name
self.job_id = job_id self.job_id = job_id
self.job_arn = "arn:aws:iot:%s:1:job/%s" % (self.region_name, job_id) self.job_arn = f"arn:aws:iot:{self.region_name}:1:job/{job_id}"
self.targets = targets self.targets = targets
self.document_source = document_source self.document_source = document_source
self.document = document self.document = document
@ -427,26 +424,20 @@ class FakeEndpoint(BaseModel):
]: ]:
raise InvalidRequestException( raise InvalidRequestException(
" An error occurred (InvalidRequestException) when calling the DescribeEndpoint " " An error occurred (InvalidRequestException) when calling the DescribeEndpoint "
"operation: Endpoint type %s not recognized." % endpoint_type f"operation: Endpoint type {endpoint_type} not recognized."
) )
self.region_name = region_name self.region_name = region_name
identifier = random.get_random_string(length=14, lower_case=True) identifier = random.get_random_string(length=14, lower_case=True)
if endpoint_type == "iot:Data": if endpoint_type == "iot:Data":
self.endpoint = "{i}.iot.{r}.amazonaws.com".format( self.endpoint = f"{identifier}.iot.{self.region_name}.amazonaws.com"
i=identifier, r=self.region_name
)
elif "iot:Data-ATS" in endpoint_type: elif "iot:Data-ATS" in endpoint_type:
self.endpoint = "{i}-ats.iot.{r}.amazonaws.com".format( self.endpoint = f"{identifier}-ats.iot.{self.region_name}.amazonaws.com"
i=identifier, r=self.region_name
)
elif "iot:CredentialProvider" in endpoint_type: elif "iot:CredentialProvider" in endpoint_type:
self.endpoint = "{i}.credentials.iot.{r}.amazonaws.com".format( self.endpoint = (
i=identifier, r=self.region_name f"{identifier}.credentials.iot.{self.region_name}.amazonaws.com"
) )
elif "iot:Jobs" in endpoint_type: elif "iot:Jobs" in endpoint_type:
self.endpoint = "{i}.jobs.iot.{r}.amazonaws.com".format( self.endpoint = f"{identifier}.jobs.iot.{self.region_name}.amazonaws.com"
i=identifier, r=self.region_name
)
self.endpoint_type = endpoint_type self.endpoint_type = endpoint_type
def to_get_dict(self): def to_get_dict(self):
@ -488,7 +479,7 @@ class FakeRule(BaseModel):
self.error_action = error_action or {} self.error_action = error_action or {}
self.sql = sql self.sql = sql
self.aws_iot_sql_version = aws_iot_sql_version or "2016-03-23" self.aws_iot_sql_version = aws_iot_sql_version or "2016-03-23"
self.arn = "arn:aws:iot:%s:1:rule/%s" % (self.region_name, rule_name) self.arn = f"arn:aws:iot:{self.region_name}:1:rule/{rule_name}"
def to_get_dict(self): def to_get_dict(self):
return { return {
@ -530,14 +521,10 @@ class FakeDomainConfiguration(BaseModel):
if service_type and service_type not in ["DATA", "CREDENTIAL_PROVIDER", "JOBS"]: if service_type and service_type not in ["DATA", "CREDENTIAL_PROVIDER", "JOBS"]:
raise InvalidRequestException( raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DescribeDomainConfiguration " "An error occurred (InvalidRequestException) when calling the DescribeDomainConfiguration "
"operation: Service type %s not recognized." % service_type f"operation: Service type {service_type} not recognized."
) )
self.domain_configuration_name = domain_configuration_name self.domain_configuration_name = domain_configuration_name
self.domain_configuration_arn = "arn:aws:iot:%s:1:domainconfiguration/%s/%s" % ( self.domain_configuration_arn = f"arn:aws:iot:{region_name}:1:domainconfiguration/{domain_configuration_name}/{random.get_random_string(length=5)}"
region_name,
domain_configuration_name,
random.get_random_string(length=5),
)
self.domain_name = domain_name self.domain_name = domain_name
self.server_certificates = [] self.server_certificates = []
if server_certificate_arns: if server_certificate_arns:
@ -871,7 +858,7 @@ class IoTBackend(BaseBackend):
] ]
if len(certs) > 0: if len(certs) > 0:
raise DeleteConflictException( raise DeleteConflictException(
"Things must be detached before deletion (arn: %s)" % certs[0] f"Things must be detached before deletion (arn: {certs[0]})"
) )
certs = [ certs = [