From efeb110d06923483b8505f7764676f1bcb53fe0b Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sat, 19 Nov 2022 21:32:53 -0100 Subject: [PATCH] Techdebt: Replace string-format with f-strings (for k* dirs) (#5684) --- moto/kinesis/models.py | 2 +- moto/kinesis/utils.py | 8 ++------ moto/kinesisvideo/models.py | 8 ++------ moto/kinesisvideoarchivedmedia/models.py | 3 +-- moto/kms/responses.py | 25 +++++++----------------- moto/kms/utils.py | 11 +++-------- 6 files changed, 16 insertions(+), 41 deletions(-) diff --git a/moto/kinesis/models.py b/moto/kinesis/models.py index b3850a77d..90f3672e6 100644 --- a/moto/kinesis/models.py +++ b/moto/kinesis/models.py @@ -85,7 +85,7 @@ class Shard(BaseModel): @property def shard_id(self): - return "shardId-{0}".format(str(self._shard_id).zfill(12)) + return f"shardId-{str(self._shard_id).zfill(12)}" def get_records(self, last_sequence_id, limit): last_sequence_id = int(last_sequence_id) diff --git a/moto/kinesis/utils.py b/moto/kinesis/utils.py index 9de8052a1..849d2a2e8 100644 --- a/moto/kinesis/utils.py +++ b/moto/kinesis/utils.py @@ -43,17 +43,13 @@ def compose_new_shard_iterator( elif shard_iterator_type == "AT_TIMESTAMP": last_sequence_id = shard.get_sequence_number_at(at_timestamp) else: - raise InvalidArgumentError( - "Invalid ShardIteratorType: {0}".format(shard_iterator_type) - ) + raise InvalidArgumentError(f"Invalid ShardIteratorType: {shard_iterator_type}") return compose_shard_iterator(stream_name, shard, last_sequence_id) def compose_shard_iterator(stream_name, shard, last_sequence_id): return encode_method( - "{0}:{1}:{2}".format(stream_name, shard.shard_id, last_sequence_id).encode( - "utf-8" - ) + f"{stream_name}:{shard.shard_id}:{last_sequence_id}".encode("utf-8") ).decode("utf-8") diff --git a/moto/kinesisvideo/models.py b/moto/kinesisvideo/models.py index 95345d028..8653016e1 100644 --- a/moto/kinesisvideo/models.py +++ b/moto/kinesisvideo/models.py @@ -32,9 +32,7 @@ class Stream(BaseModel): def get_data_endpoint(self, api_name): data_endpoint_prefix = "s-" if api_name in ("PUT_MEDIA", "GET_MEDIA") else "b-" - return "https://{}{}.kinesisvideo.{}.amazonaws.com".format( - data_endpoint_prefix, self.data_endpoint_number, self.region_name - ) + return f"https://{data_endpoint_prefix}{self.data_endpoint_number}.kinesisvideo.{self.region_name}.amazonaws.com" def to_dict(self): return { @@ -66,9 +64,7 @@ class KinesisVideoBackend(BaseBackend): ): streams = [_ for _ in self.streams.values() if _.stream_name == stream_name] if len(streams) > 0: - raise ResourceInUseException( - "The stream {} already exists.".format(stream_name) - ) + raise ResourceInUseException(f"The stream {stream_name} already exists.") stream = Stream( self.account_id, self.region_name, diff --git a/moto/kinesisvideoarchivedmedia/models.py b/moto/kinesisvideoarchivedmedia/models.py index 2d5a12dab..20ca92770 100644 --- a/moto/kinesisvideoarchivedmedia/models.py +++ b/moto/kinesisvideoarchivedmedia/models.py @@ -17,8 +17,7 @@ class KinesisVideoArchivedMediaBackend(BaseBackend): "GET_DASH_STREAMING_SESSION_URL": "/dash/v1/getDASHManifest.mpd", } relative_path = api_to_relative_path[api_name] - url = "{}{}?SessionToken={}".format(data_endpoint, relative_path, session_token) - return url + return f"{data_endpoint}{relative_path}?SessionToken={session_token}" def get_hls_streaming_session_url(self, stream_name, stream_arn): # Ignore option paramters as the format of hls_url does't depends on them diff --git a/moto/kms/responses.py b/moto/kms/responses.py index 4784cb794..f2dc70eb7 100644 --- a/moto/kms/responses.py +++ b/moto/kms/responses.py @@ -61,14 +61,12 @@ class KmsResponse(BaseResponse): ) if not is_arn and not is_raw_key_id: - raise NotFoundException("Invalid keyId {key_id}".format(key_id=key_id)) + raise NotFoundException(f"Invalid keyId {key_id}") cmk_id = self.kms_backend.get_key_id(key_id) if cmk_id not in self.kms_backend.keys: - raise NotFoundException( - "Key '{key_id}' does not exist".format(key_id=self._display_arn(key_id)) - ) + raise NotFoundException(f"Key '{self._display_arn(key_id)}' does not exist") def _validate_alias(self, key_id): """Determine whether an alias exists. @@ -76,9 +74,7 @@ class KmsResponse(BaseResponse): - alias name - alias ARN """ - error = NotFoundException( - "Alias {key_id} is not found.".format(key_id=self._display_arn(key_id)) - ) + error = NotFoundException(f"Alias {self._display_arn(key_id)} is not found.") is_arn = key_id.startswith("arn:") and ":alias/" in key_id is_name = key_id.startswith("alias/") @@ -211,16 +207,14 @@ class KmsResponse(BaseResponse): if ":" in alias_name: raise ValidationException( - "{alias_name} contains invalid characters for an alias".format( - alias_name=alias_name - ) + f"{alias_name} contains invalid characters for an alias" ) if not re.match(r"^[a-zA-Z0-9:/_-]+$", alias_name): raise ValidationException( - "1 validation error detected: Value '{alias_name}' at 'aliasName' " + f"1 validation error detected: Value '{alias_name}' at 'aliasName' " "failed to satisfy constraint: Member must satisfy regular " - "expression pattern: ^[a-zA-Z0-9:/_-]+$".format(alias_name=alias_name) + "expression pattern: ^[a-zA-Z0-9:/_-]+$" ) if self.kms_backend.alias_exists(target_key_id): @@ -232,12 +226,7 @@ class KmsResponse(BaseResponse): if self.kms_backend.alias_exists(alias_name): raise AlreadyExistsException( - "An alias with the name arn:aws:kms:{region}:{account_id}:{alias_name} " - "already exists".format( - region=self.region, - account_id=self.current_account, - alias_name=alias_name, - ) + f"An alias with the name arn:aws:kms:{self.region}:{self.current_account}:{alias_name} already exists" ) self._validate_cmk_id(target_key_id) diff --git a/moto/kms/utils.py b/moto/kms/utils.py index c7fc27409..5d1e81546 100644 --- a/moto/kms/utils.py +++ b/moto/kms/utils.py @@ -22,9 +22,7 @@ IV_LEN = 12 TAG_LEN = 16 HEADER_LEN = KEY_ID_LEN + IV_LEN + TAG_LEN # NOTE: This is just a simple binary format. It is not what KMS actually does. -CIPHERTEXT_HEADER_FORMAT = ">{key_id_len}s{iv_len}s{tag_len}s".format( - key_id_len=KEY_ID_LEN, iv_len=IV_LEN, tag_len=TAG_LEN -) +CIPHERTEXT_HEADER_FORMAT = f">{KEY_ID_LEN}s{IV_LEN}s{TAG_LEN}s" Ciphertext = namedtuple("Ciphertext", ("key_id", "iv", "ciphertext", "tag")) RESERVED_ALIASES = [ @@ -134,11 +132,8 @@ def encrypt(master_keys, key_id, plaintext, encryption_context): key = master_keys[key_id] except KeyError: is_alias = key_id.startswith("alias/") or ":alias/" in key_id - raise NotFoundException( - "{id_type} {key_id} is not found.".format( - id_type="Alias" if is_alias else "keyId", key_id=key_id - ) - ) + id_type = "Alias" if is_alias else "keyId" + raise NotFoundException(f"{id_type} {key_id} is not found.") if plaintext == b"": raise ValidationException(