Techdebt: Replace string-format with f-strings (for c* dirs) (#5656)

This commit is contained in:
Bert Blommers 2022-11-11 09:57:17 -01:00 committed by GitHub
parent 37845792d3
commit 222621fe94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 93 additions and 154 deletions

View File

@ -13,7 +13,7 @@ class UnformattedGetAttTemplateException(Exception):
class ValidationError(RESTError):
def __init__(self, name_or_id: Optional[str] = None, message: Optional[str] = None):
if message is None:
message = "Stack with id {0} does not exist".format(name_or_id)
message = f"Stack with id {name_or_id} does not exist"
template = Template(ERROR_RESPONSE)
super().__init__(error_type="ValidationError", message=message)
@ -23,7 +23,7 @@ class ValidationError(RESTError):
class MissingParameterError(RESTError):
def __init__(self, parameter_name: str):
template = Template(ERROR_RESPONSE)
message = "Missing parameter {0}".format(parameter_name)
message = f"Missing parameter {parameter_name}"
super().__init__(error_type="ValidationError", message=message)
self.description = template.render(code="Missing Parameter", message=message)
@ -33,7 +33,7 @@ class ExportNotFound(RESTError):
def __init__(self, export_name: str):
template = Template(ERROR_RESPONSE)
message = "No export named {0} found.".format(export_name)
message = f"No export named {export_name} found."
super().__init__(error_type="ExportNotFound", message=message)
self.description = template.render(code="ExportNotFound", message=message)

View File

@ -178,7 +178,7 @@ class FakeStackInstances(BaseModel):
):
self.parameters = parameters or {}
self.stackset_id = stackset_id
self.stack_name = "StackSet-{}".format(stackset_id)
self.stack_name = f"StackSet-{stackset_id}"
self.stackset_name = stackset_name
self.stack_instances: List[Dict[str, Any]] = []
@ -503,29 +503,17 @@ class FakeEvent(BaseModel):
def sendToSns(
self, account_id: str, region: str, sns_topic_arns: List[str]
) -> None:
message = """StackId='{stack_id}'
Timestamp='{timestamp}'
EventId='{event_id}'
LogicalResourceId='{logical_resource_id}'
message = f"""StackId='{self.stack_id}'
Timestamp='{iso_8601_datetime_with_milliseconds(self.timestamp)}'
EventId='{self.event_id}'
LogicalResourceId='{self.logical_resource_id}'
Namespace='{account_id}'
ResourceProperties='{resource_properties}'
ResourceStatus='{resource_status}'
ResourceStatusReason='{resource_status_reason}'
ResourceType='{resource_type}'
StackName='{stack_name}'
ClientRequestToken='{client_request_token}'""".format(
stack_id=self.stack_id,
timestamp=iso_8601_datetime_with_milliseconds(self.timestamp),
event_id=self.event_id,
logical_resource_id=self.logical_resource_id,
account_id=account_id,
resource_properties=self.resource_properties,
resource_status=self.resource_status,
resource_status_reason=self.resource_status_reason,
resource_type=self.resource_type,
stack_name=self.stack_name,
client_request_token=self.client_request_token,
)
ResourceProperties='{self.resource_properties}'
ResourceStatus='{self.resource_status}'
ResourceStatusReason='{self.resource_status_reason}'
ResourceType='{self.resource_type}'
StackName='{self.stack_name}'
ClientRequestToken='{self.client_request_token}'"""
for sns_topic_arn in sns_topic_arns:
sns_backends[account_id][region].publish(

View File

@ -97,7 +97,7 @@ class Output(object):
self.value = value
def __repr__(self) -> str:
return 'Output:"%s"="%s"' % (self.key, self.value)
return f'Output:"{self.key}"="{self.value}"'
class LazyDict(Dict[str, Any]):
@ -225,7 +225,7 @@ def clean_json(resource_json: Any, resources_map: "ResourceMap") -> Any:
result = []
# TODO: make this configurable, to reflect the real AWS AZs
for az in ("a", "b", "c", "d"):
result.append("%s%s" % (region, az))
result.append(f"{region}{az}")
return result
cleaned_json = {}
@ -268,24 +268,22 @@ def generate_resource_name(resource_type: str, stack_name: str, logical_id: str)
]:
# Target group names need to be less than 32 characters, so when cloudformation creates a name for you
# it makes sure to stay under that limit
name_prefix = "{0}-{1}".format(stack_name, logical_id)
name_prefix = f"{stack_name}-{logical_id}"
my_random_suffix = random_suffix()
truncated_name_prefix = name_prefix[0 : 32 - (len(my_random_suffix) + 1)]
# if the truncated name ends in a dash, we'll end up with a double dash in the final name, which is
# not allowed
if truncated_name_prefix.endswith("-"):
truncated_name_prefix = truncated_name_prefix[:-1]
return "{0}-{1}".format(truncated_name_prefix, my_random_suffix)
return f"{truncated_name_prefix}-{my_random_suffix}"
elif resource_type == "AWS::S3::Bucket":
right_hand_part_of_name = "-{0}-{1}".format(logical_id, random_suffix())
right_hand_part_of_name = f"-{logical_id}-{random_suffix()}"
max_stack_name_portion_len = 63 - len(right_hand_part_of_name)
return "{0}{1}".format(
stack_name[:max_stack_name_portion_len], right_hand_part_of_name
).lower()
return f"{stack_name[:max_stack_name_portion_len]}{right_hand_part_of_name}".lower()
elif resource_type == "AWS::IAM::Policy":
return "{0}-{1}-{2}".format(stack_name[:5], logical_id[:4], random_suffix())
return f"{stack_name[:5]}-{logical_id[:4]}-{random_suffix()}"
else:
return "{0}-{1}-{2}".format(stack_name, logical_id, random_suffix())
return f"{stack_name}-{logical_id}-{random_suffix()}"
def parse_resource(
@ -295,9 +293,7 @@ def parse_resource(
resource_class = resource_class_from_type(resource_type)
if not resource_class:
warnings.warn(
"Tried to parse {0} but it's not supported by moto's CloudFormation implementation".format(
resource_type
)
f"Tried to parse {resource_type} but it's not supported by moto's CloudFormation implementation"
)
return None # type: ignore[return-value]

View File

@ -280,8 +280,8 @@ class CloudFormationResponse(BaseResponse):
break
if not resource:
message = "Resource {0} does not exist for stack {1}".format(
logical_resource_id, stack_name
message = (
f"Resource {logical_resource_id} does not exist for stack {stack_name}"
)
raise ValidationError(stack_name, message)
@ -377,8 +377,7 @@ class CloudFormationResponse(BaseResponse):
if stack.status == "ROLLBACK_COMPLETE":
raise ValidationError(
stack.stack_id,
message="Stack:{0} is in ROLLBACK_COMPLETE state and can not "
"be updated.".format(stack.stack_id),
message=f"Stack:{stack.stack_id} is in ROLLBACK_COMPLETE state and can not be updated.",
)
def update_stack(self) -> str:

View File

@ -19,7 +19,7 @@ def generate_changeset_id(
def generate_stackset_id(stackset_name: str) -> str:
random_id = random.uuid4()
return "{}:{}".format(stackset_name, random_id)
return f"{stackset_name}:{random_id}"
def generate_stackset_arn(stackset_id: str, region_name: str, account_id: str) -> str:
@ -48,7 +48,7 @@ def yaml_tag_constructor(loader: Any, tag: Any, node: Any) -> Any:
if tag == "!Ref":
key = "Ref"
else:
key = "Fn::{}".format(tag[1:])
key = f"Fn::{tag[1:]}"
return {key: _f(loader, tag, node)}

View File

@ -281,7 +281,7 @@ class Dashboard(BaseModel):
return len(self.body)
def __repr__(self) -> str:
return "<CloudWatchDashboard {0}>".format(self.name)
return f"<CloudWatchDashboard {self.name}>"
class Statistics:

View File

@ -1,6 +1,6 @@
def make_arn_for_dashboard(account_id: str, name: str) -> str:
return "arn:aws:cloudwatch::{0}dashboard/{1}".format(account_id, name)
return f"arn:aws:cloudwatch::{account_id}dashboard/{name}"
def make_arn_for_alarm(region: str, account_id: str, alarm_name: str) -> str:
return "arn:aws:cloudwatch:{0}:{1}:alarm:{2}".format(region, account_id, alarm_name)
return f"arn:aws:cloudwatch:{region}:{account_id}:alarm:{alarm_name}"

View File

@ -103,9 +103,9 @@ class CodeBuild(BaseModel):
self.project_metadata: Dict[str, Any] = dict()
self.project_metadata["name"] = project_name
self.project_metadata["arn"] = "arn:aws:codebuild:{0}:{1}:project/{2}".format(
region, account_id, self.project_metadata["name"]
)
self.project_metadata[
"arn"
] = f"arn:aws:codebuild:{region}:{account_id}:project/{project_name}"
self.project_metadata[
"encryptionKey"
] = f"arn:aws:kms:{region}:{account_id}:alias/aws/s3"
@ -178,7 +178,7 @@ class CodeBuildBackend(BaseBackend):
artifact_override: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
build_id = "{0}:{1}".format(project_name, mock_random.uuid4())
build_id = f"{project_name}:{mock_random.uuid4()}"
# construct a new build
self.build_metadata[project_name] = CodeBuildProjectMetadata(

View File

@ -58,9 +58,7 @@ def _validate_required_params_environment(environment: Dict[str, Any]) -> None:
"LINUX_GPU_CONTAINER",
"ARM_CONTAINER",
]:
raise InvalidInputException(
"Invalid type provided: {0}".format(environment["type"])
)
raise InvalidInputException(f"Invalid type provided: {environment['type']}")
if environment["computeType"] not in [
"BUILD_GENERAL1_SMALL",
@ -69,7 +67,7 @@ def _validate_required_params_environment(environment: Dict[str, Any]) -> None:
"BUILD_GENERAL1_2XLARGE",
]:
raise InvalidInputException(
"Invalid compute type provided: {0}".format(environment["computeType"])
f"Invalid compute type provided: {environment['computeType']}"
)
@ -90,7 +88,7 @@ def _validate_required_params_id(build_id: str, build_ids: List[str]) -> None:
raise InvalidInputException("Invalid build ID provided")
if build_id not in build_ids:
raise ResourceNotFoundException("Build {0} does not exist".format(build_id))
raise ResourceNotFoundException(f"Build {build_id} does not exist")
class CodeBuildResponse(BaseResponse):
@ -105,10 +103,9 @@ class CodeBuildResponse(BaseResponse):
self._get_param("projectName")
not in self.codebuild_backend.codebuild_projects.keys()
):
name = self._get_param("projectName")
raise ResourceNotFoundException(
"The provided project arn:aws:codebuild:{0}:{1}:project/{2} does not exist".format(
self.region, self.current_account, self._get_param("projectName")
)
f"The provided project arn:aws:codebuild:{self.region}:{self.current_account}:project/{name} does not exist"
)
ids = self.codebuild_backend.list_builds_for_project(
@ -127,10 +124,9 @@ class CodeBuildResponse(BaseResponse):
_validate_required_params_project_name(self._get_param("name"))
if self._get_param("name") in self.codebuild_backend.codebuild_projects.keys():
name = self._get_param("name")
raise ResourceAlreadyExistsException(
"Project already exists: arn:aws:codebuild:{0}:{1}:project/{2}".format(
self.region, self.current_account, self._get_param("name")
)
f"Project already exists: arn:aws:codebuild:{self.region}:{self.current_account}:project/{name}"
)
project_metadata = self.codebuild_backend.create_project(
@ -154,10 +150,9 @@ class CodeBuildResponse(BaseResponse):
self._get_param("projectName")
not in self.codebuild_backend.codebuild_projects.keys()
):
name = self._get_param("projectName")
raise ResourceNotFoundException(
"Project cannot be found: arn:aws:codebuild:{0}:{1}:project/{2}".format(
self.region, self.current_account, self._get_param("projectName")
)
f"Project cannot be found: arn:aws:codebuild:{self.region}:{self.current_account}:project/{name}"
)
metadata = self.codebuild_backend.start_build(

View File

@ -7,7 +7,7 @@ class RepositoryNameExistsException(JsonRESTError):
def __init__(self, repository_name: str):
super().__init__(
"RepositoryNameExistsException",
"Repository named {0} already exists".format(repository_name),
f"Repository named {repository_name} already exists",
)
@ -16,8 +16,7 @@ class RepositoryDoesNotExistException(JsonRESTError):
def __init__(self, repository_name: str):
super().__init__(
"RepositoryDoesNotExistException",
"{0} does not exist".format(repository_name),
"RepositoryDoesNotExistException", f"{repository_name} does not exist"
)

View File

@ -19,14 +19,10 @@ class CodeCommit(BaseModel):
self.repository_metadata["repositoryName"] = repository_name
self.repository_metadata[
"cloneUrlSsh"
] = "ssh://git-codecommit.{0}.amazonaws.com/v1/repos/{1}".format(
region, repository_name
)
] = f"ssh://git-codecommit.{region}.amazonaws.com/v1/repos/{repository_name}"
self.repository_metadata[
"cloneUrlHttp"
] = "https://git-codecommit.{0}.amazonaws.com/v1/repos/{1}".format(
region, repository_name
)
] = f"https://git-codecommit.{region}.amazonaws.com/v1/repos/{repository_name}"
self.repository_metadata["creationDate"] = current_date
self.repository_metadata["lastModifiedDate"] = current_date
self.repository_metadata["repositoryDescription"] = repository_description

View File

@ -34,5 +34,5 @@ class TooManyTagsException(JsonRESTError):
def __init__(self, arn: str):
super().__init__(
"TooManyTagsException", "Tag limit exceeded for resource [{}].".format(arn)
"TooManyTagsException", f"Tag limit exceeded for resource [{arn}]."
)

View File

@ -101,9 +101,7 @@ class CodePipelineBackend(BaseBackend):
raise IAMNotFoundException("")
except IAMNotFoundException:
raise InvalidStructureException(
"CodePipeline is not authorized to perform AssumeRole on role {}".format(
pipeline["roleArn"]
)
f"CodePipeline is not authorized to perform AssumeRole on role {pipeline['roleArn']}"
)
if len(pipeline["stages"]) < 2:

View File

@ -2,4 +2,4 @@ from moto.moto_api._internal import mock_random
def get_random_identity_id(region: str) -> str:
return "{0}:{1}".format(region, mock_random.uuid4())
return f"{region}:{mock_random.uuid4()}"

View File

@ -383,7 +383,7 @@ class CognitoIdpUserPool(BaseModel):
user_pool_id = generate_id(
get_cognito_idp_user_pool_id_strategy(), region, name, extended_config
)
self.id = "{}_{}".format(self.region, user_pool_id)[: self.MAX_ID_LENGTH]
self.id = f"{self.region}_{user_pool_id}"[: self.MAX_ID_LENGTH]
self.arn = f"arn:aws:cognito-idp:{self.region}:{account_id}:userpool/{self.id}"
self.name = name
@ -530,9 +530,7 @@ class CognitoIdpUserPool(BaseModel):
) -> Tuple[str, int]:
now = int(time.time())
payload = {
"iss": "https://cognito-idp.{}.amazonaws.com/{}".format(
self.region, self.id
),
"iss": f"https://cognito-idp.{self.region}.amazonaws.com/{self.id}",
"sub": self._get_user(username).id,
"aud": client_id,
"token_use": token_use,
@ -1654,7 +1652,7 @@ class CognitoIdpBackend(BaseBackend):
if identifier in user_pool.resource_servers:
raise InvalidParameterException(
"%s already exists in user pool %s." % (identifier, user_pool_id)
f"{identifier} already exists in user pool {user_pool_id}."
)
resource_server = CognitoResourceServer(user_pool_id, identifier, name, scopes)

View File

@ -185,11 +185,10 @@ class TooManyAccountSources(JsonRESTError):
def __init__(self, length: int):
locations = ["com.amazonaws.xyz"] * length
locs = ", ".join(locations)
message = (
"Value '[{locations}]' at 'accountAggregationSources' failed to satisfy constraint: "
"Member must have length less than or equal to 1".format(
locations=", ".join(locations)
)
f"Value '[{locs}]' at 'accountAggregationSources' failed to satisfy constraint: "
"Member must have length less than or equal to 1"
)
super().__init__("ValidationException", message)

View File

@ -420,15 +420,7 @@ class ConfigAggregationAuthorization(ConfigEmptyDictable):
):
super().__init__(capitalize_start=True, capitalize_arn=False)
self.aggregation_authorization_arn = (
"arn:aws:config:{region}:{id}:aggregation-authorization/"
"{auth_account}/{auth_region}".format(
region=current_region,
id=account_id,
auth_account=authorized_account_id,
auth_region=authorized_aws_region,
)
)
self.aggregation_authorization_arn = f"arn:aws:config:{current_region}:{account_id}:aggregation-authorization/{authorized_account_id}/{authorized_aws_region}"
self.authorized_account_id = authorized_account_id
self.authorized_aws_region = authorized_aws_region
self.creation_time = datetime2int(datetime.utcnow())
@ -451,7 +443,7 @@ class OrganizationConformancePack(ConfigEmptyDictable):
super().__init__(capitalize_start=True, capitalize_arn=False)
self._status = "CREATE_SUCCESSFUL"
self._unique_pack_name = "{0}-{1}".format(name, random_string())
self._unique_pack_name = f"{name}-{random_string()}"
self.conformance_pack_input_parameters = input_parameters or []
self.delivery_s3_bucket = delivery_s3_bucket
@ -1091,7 +1083,7 @@ class ConfigBackend(BaseBackend):
tag_dict = validate_tags(tags or [])
# Does this already exist?
key = "{}/{}".format(authorized_account, authorized_region)
key = f"{authorized_account}/{authorized_region}"
agg_auth = self.aggregation_authorizations.get(key)
if not agg_auth:
agg_auth = ConfigAggregationAuthorization(
@ -1101,9 +1093,7 @@ class ConfigBackend(BaseBackend):
authorized_region,
tags=tag_dict,
)
self.aggregation_authorizations[
"{}/{}".format(authorized_account, authorized_region)
] = agg_auth
self.aggregation_authorizations[key] = agg_auth
else:
# Only update the tags:
agg_auth.tags = tag_dict
@ -1148,7 +1138,7 @@ class ConfigBackend(BaseBackend):
) -> None:
# This will always return a 200 -- regardless if there is or isn't an existing
# aggregation authorization.
key = "{}/{}".format(authorized_account, authorized_region)
key = f"{authorized_account}/{authorized_region}"
self.aggregation_authorizations.pop(key, None)
def put_configuration_recorder(self, config_recorder: Dict[str, Any]) -> None:
@ -1725,9 +1715,9 @@ class ConfigBackend(BaseBackend):
if not re.match(r"s3://.*", template_s3_uri):
raise ValidationException(
"1 validation error detected: "
"Value '{}' at 'templateS3Uri' failed to satisfy constraint: "
f"Value '{template_s3_uri}' at 'templateS3Uri' failed to satisfy constraint: "
"Member must satisfy regular expression pattern: "
"s3://.*".format(template_s3_uri)
"s3://.*"
)
pack = self.organization_conformance_packs.get(name)
@ -1822,9 +1812,7 @@ class ConfigBackend(BaseBackend):
statuses = [
{
"AccountId": self.account_id,
"ConformancePackName": "OrgConformsPack-{0}".format(
pack._unique_pack_name
),
"ConformancePackName": f"OrgConformsPack-{pack._unique_pack_name}",
"Status": pack._status,
"LastUpdateTime": datetime2int(datetime.utcnow()),
}
@ -1838,7 +1826,7 @@ class ConfigBackend(BaseBackend):
if not pack:
raise NoSuchOrganizationConformancePackException(
"Could not find an OrganizationConformancePack for given "
"request with resourceName {}".format(name)
f"request with resourceName {name}"
)
self.organization_conformance_packs.pop(name)

View File

@ -43,9 +43,7 @@ class CallbackResponse(responses.CallbackResponse):
content_length=request.headers.get("Content-Length"),
content_type=request.headers.get("Content-Type"),
method=request.method,
base_url="{scheme}://{netloc}".format(
scheme=url.scheme, netloc=url.netloc
),
base_url=f"{url.scheme}://{url.netloc}",
headers=[(k, v) for k, v in request.headers.items()],
)
request = req

View File

@ -63,7 +63,7 @@ class RESTError(HTTPException):
error_type=error_type,
message=message,
request_id_tag=self.request_id_tag_name,
**kwargs
**kwargs,
)
self.content_type = "application/xml"
@ -124,10 +124,7 @@ class AccessDeniedError(RESTError):
def __init__(self, user_arn: str, action: str):
super().__init__(
"AccessDenied",
"User: {user_arn} is not authorized to perform: {operation}".format(
user_arn=user_arn, operation=action
),
"AccessDenied", f"User: {user_arn} is not authorized to perform: {action}"
)
@ -170,4 +167,4 @@ class InvalidToken(AWSError):
code = 400
def __init__(self, message: str = "Invalid token"):
super().__init__("Invalid Token: {}".format(message), "InvalidToken")
super().__init__(f"Invalid Token: {message}", "InvalidToken")

View File

@ -385,17 +385,15 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
.replace("-", "_")
)
if is_last:
return "(?P<%s>[^/]+)" % name
return "(?P<%s>.*)" % name
return f"(?P<{name}>[^/]+)"
return f"(?P<{name}>.*)"
elems = uri.split("/")
num_elems = len(elems)
regexp = "^{}$".format(
"/".join(
[_convert(elem, (i == num_elems - 1)) for i, elem in enumerate(elems)]
)
regexp = "/".join(
[_convert(elem, (i == num_elems - 1)) for i, elem in enumerate(elems)]
)
return regexp
return f"^{regexp}$"
def _get_action_from_method_and_request_uri(
self, method: str, request_uri: str
@ -470,9 +468,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
if not action:
return 404, headers, ""
raise NotImplementedError(
"The {0} action has not been implemented".format(action)
)
raise NotImplementedError(f"The {action} action has not been implemented")
@staticmethod
def _send_response(headers: Dict[str, str], response: Any) -> Tuple[int, Dict[str, str], str]: # type: ignore[misc]
@ -569,10 +565,8 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
match = self.param_regex.search(name[len(param_prefix) :])
if match:
# enable access to params that are lists of dicts, e.g., "TagSpecification.1.ResourceType=.."
sub_attr = "%s%s.%s" % (
name[: len(param_prefix)],
match.group(1),
match.group(2),
sub_attr = (
f"{name[: len(param_prefix)]}{match.group(1)}.{match.group(2)}"
)
if match.group(3):
value = self._get_multi_param_helper(
@ -753,7 +747,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
results = []
param_index = 1
while True:
index_prefix = "{0}.{1}.".format(param_prefix, param_index)
index_prefix = f"{param_prefix}.{param_index}."
new_items = {}
for key, value in self.querystring.items():
if key.startswith(index_prefix):
@ -772,7 +766,7 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
results = {}
param_index = 1
while 1:
index_prefix = "{0}.{1}.".format(param_prefix, param_index)
index_prefix = f"{param_prefix}.{param_index}."
k, v = None, None
for key, value in self.querystring.items():
@ -820,14 +814,14 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
index = 1
while True:
# Loop through looking for keys representing object name
name_key = "{0}.{1}.{2}".format(prefix, index, name)
name_key = f"{prefix}.{index}.{name}"
obj_name = self.querystring.get(name_key)
if not obj_name:
# Found all keys
break
obj = {}
value_key_prefix = "{0}.{1}.{2}.".format(prefix, index, value)
value_key_prefix = f"{prefix}.{index}.{value}."
for k, v in self.querystring.items():
if k.startswith(value_key_prefix):
_, value_key = k.split(value_key_prefix, 1)
@ -862,7 +856,7 @@ class _RecursiveDictRef(object):
self.dic: Dict[str, Any] = {}
def __repr__(self) -> str:
return "{!r}".format(self.dic)
return f"{self.dic}"
def __getattr__(self, key: str) -> Any:
return self.dic.__getattr__(key) # type: ignore[attr-defined]
@ -896,7 +890,7 @@ class AWSServiceSpec(object):
try:
op = self.operations[operation]
except KeyError:
raise ValueError("Invalid operation: {}".format(operation))
raise ValueError(f"Invalid operation: {operation}")
if "input" not in op:
return {}
shape = self.shapes[op["input"]["shape"]]
@ -913,7 +907,7 @@ class AWSServiceSpec(object):
try:
op = self.operations[operation]
except KeyError:
raise ValueError("Invalid operation: {}".format(operation))
raise ValueError(f"Invalid operation: {operation}")
if "output" not in op:
return {}
shape = self.shapes[op["output"]["shape"]]
@ -998,7 +992,7 @@ def to_str(value: Any, spec: Dict[str, Any]) -> str:
elif value is None:
return "null"
else:
raise TypeError("Unknown type {}".format(vtype))
raise TypeError(f"Unknown type {vtype}")
def from_str(value: str, spec: Dict[str, Any]) -> Any:
@ -1015,7 +1009,7 @@ def from_str(value: str, spec: Dict[str, Any]) -> Any:
return value
elif vtype == "string":
return value
raise TypeError("Unknown type {}".format(vtype))
raise TypeError(f"Unknown type {vtype}")
def flatten_json_request_body(

View File

@ -73,7 +73,7 @@ def convert_regex_to_flask_path(url_path: str) -> str:
def caller(reg: Any) -> str:
match_name, match_pattern = reg.groups()
return '<regex("{0}"):{1}>'.format(match_pattern, match_name)
return f'<regex("{match_pattern}"):{match_name}>'
url_path = re.sub(r"\(\?P<(.*?)>(.*?)\)", caller, url_path)
@ -95,7 +95,7 @@ class convert_to_flask_response(object):
outer = self.callback.__self__.__class__.__name__
else:
outer = self.callback.__module__
return "{0}.{1}".format(outer, self.callback.__name__)
return f"{outer}.{self.callback.__name__}"
def __call__(self, args: Any = None, **kwargs: Any) -> Any:
from flask import request, Response
@ -130,7 +130,7 @@ class convert_flask_to_responses_response(object):
outer = self.callback.__self__.__class__.__name__
else:
outer = self.callback.__module__
return "{0}.{1}".format(outer, self.callback.__name__)
return f"{outer}.{self.callback.__name__}"
def __call__(self, request: Any, *args: Any, **kwargs: Any) -> TYPE_RESPONSE:
for key, val in request.headers.items():
@ -203,14 +203,8 @@ def tags_from_query_string(
for key in querystring_dict.keys():
if key.startswith(prefix) and key.endswith(key_suffix):
tag_index = key.replace(prefix + ".", "").replace("." + key_suffix, "")
tag_key = querystring_dict[
"{prefix}.{index}.{key_suffix}".format(
prefix=prefix, index=tag_index, key_suffix=key_suffix
)
][0]
tag_value_key = "{prefix}.{index}.{value_suffix}".format(
prefix=prefix, index=tag_index, value_suffix=value_suffix
)
tag_key = querystring_dict[f"{prefix}.{tag_index}.{key_suffix}"][0]
tag_value_key = f"{prefix}.{tag_index}.{value_suffix}"
if tag_value_key in querystring_dict:
response_values[tag_key] = querystring_dict[tag_value_key][0]
else:

View File

@ -6,7 +6,7 @@ markers =
relative_files = True
[flake8]
ignore = W503,W605,E128,E501,E203,E266,E501,E231
ignore = W503,W605,E128,E501,E203,E266,E501,E231,FS003
exclude = moto/packages,dist,tests/terraformtests
[pylint.MASTER]