Techdebt: Replace string-format with f-strings (for k* dirs) (#5684)

This commit is contained in:
Bert Blommers 2022-11-19 21:32:53 -01:00 committed by GitHub
parent 2c8e795991
commit efeb110d06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 16 additions and 41 deletions

View File

@ -85,7 +85,7 @@ class Shard(BaseModel):
@property
def shard_id(self):
return "shardId-{0}".format(str(self._shard_id).zfill(12))
return f"shardId-{str(self._shard_id).zfill(12)}"
def get_records(self, last_sequence_id, limit):
last_sequence_id = int(last_sequence_id)

View File

@ -43,17 +43,13 @@ def compose_new_shard_iterator(
elif shard_iterator_type == "AT_TIMESTAMP":
last_sequence_id = shard.get_sequence_number_at(at_timestamp)
else:
raise InvalidArgumentError(
"Invalid ShardIteratorType: {0}".format(shard_iterator_type)
)
raise InvalidArgumentError(f"Invalid ShardIteratorType: {shard_iterator_type}")
return compose_shard_iterator(stream_name, shard, last_sequence_id)
def compose_shard_iterator(stream_name, shard, last_sequence_id):
return encode_method(
"{0}:{1}:{2}".format(stream_name, shard.shard_id, last_sequence_id).encode(
"utf-8"
)
f"{stream_name}:{shard.shard_id}:{last_sequence_id}".encode("utf-8")
).decode("utf-8")

View File

@ -32,9 +32,7 @@ class Stream(BaseModel):
def get_data_endpoint(self, api_name):
data_endpoint_prefix = "s-" if api_name in ("PUT_MEDIA", "GET_MEDIA") else "b-"
return "https://{}{}.kinesisvideo.{}.amazonaws.com".format(
data_endpoint_prefix, self.data_endpoint_number, self.region_name
)
return f"https://{data_endpoint_prefix}{self.data_endpoint_number}.kinesisvideo.{self.region_name}.amazonaws.com"
def to_dict(self):
return {
@ -66,9 +64,7 @@ class KinesisVideoBackend(BaseBackend):
):
streams = [_ for _ in self.streams.values() if _.stream_name == stream_name]
if len(streams) > 0:
raise ResourceInUseException(
"The stream {} already exists.".format(stream_name)
)
raise ResourceInUseException(f"The stream {stream_name} already exists.")
stream = Stream(
self.account_id,
self.region_name,

View File

@ -17,8 +17,7 @@ class KinesisVideoArchivedMediaBackend(BaseBackend):
"GET_DASH_STREAMING_SESSION_URL": "/dash/v1/getDASHManifest.mpd",
}
relative_path = api_to_relative_path[api_name]
url = "{}{}?SessionToken={}".format(data_endpoint, relative_path, session_token)
return url
return f"{data_endpoint}{relative_path}?SessionToken={session_token}"
def get_hls_streaming_session_url(self, stream_name, stream_arn):
# Ignore option paramters as the format of hls_url does't depends on them

View File

@ -61,14 +61,12 @@ class KmsResponse(BaseResponse):
)
if not is_arn and not is_raw_key_id:
raise NotFoundException("Invalid keyId {key_id}".format(key_id=key_id))
raise NotFoundException(f"Invalid keyId {key_id}")
cmk_id = self.kms_backend.get_key_id(key_id)
if cmk_id not in self.kms_backend.keys:
raise NotFoundException(
"Key '{key_id}' does not exist".format(key_id=self._display_arn(key_id))
)
raise NotFoundException(f"Key '{self._display_arn(key_id)}' does not exist")
def _validate_alias(self, key_id):
"""Determine whether an alias exists.
@ -76,9 +74,7 @@ class KmsResponse(BaseResponse):
- alias name
- alias ARN
"""
error = NotFoundException(
"Alias {key_id} is not found.".format(key_id=self._display_arn(key_id))
)
error = NotFoundException(f"Alias {self._display_arn(key_id)} is not found.")
is_arn = key_id.startswith("arn:") and ":alias/" in key_id
is_name = key_id.startswith("alias/")
@ -211,16 +207,14 @@ class KmsResponse(BaseResponse):
if ":" in alias_name:
raise ValidationException(
"{alias_name} contains invalid characters for an alias".format(
alias_name=alias_name
)
f"{alias_name} contains invalid characters for an alias"
)
if not re.match(r"^[a-zA-Z0-9:/_-]+$", alias_name):
raise ValidationException(
"1 validation error detected: Value '{alias_name}' at 'aliasName' "
f"1 validation error detected: Value '{alias_name}' at 'aliasName' "
"failed to satisfy constraint: Member must satisfy regular "
"expression pattern: ^[a-zA-Z0-9:/_-]+$".format(alias_name=alias_name)
"expression pattern: ^[a-zA-Z0-9:/_-]+$"
)
if self.kms_backend.alias_exists(target_key_id):
@ -232,12 +226,7 @@ class KmsResponse(BaseResponse):
if self.kms_backend.alias_exists(alias_name):
raise AlreadyExistsException(
"An alias with the name arn:aws:kms:{region}:{account_id}:{alias_name} "
"already exists".format(
region=self.region,
account_id=self.current_account,
alias_name=alias_name,
)
f"An alias with the name arn:aws:kms:{self.region}:{self.current_account}:{alias_name} already exists"
)
self._validate_cmk_id(target_key_id)

View File

@ -22,9 +22,7 @@ IV_LEN = 12
TAG_LEN = 16
HEADER_LEN = KEY_ID_LEN + IV_LEN + TAG_LEN
# NOTE: This is just a simple binary format. It is not what KMS actually does.
CIPHERTEXT_HEADER_FORMAT = ">{key_id_len}s{iv_len}s{tag_len}s".format(
key_id_len=KEY_ID_LEN, iv_len=IV_LEN, tag_len=TAG_LEN
)
CIPHERTEXT_HEADER_FORMAT = f">{KEY_ID_LEN}s{IV_LEN}s{TAG_LEN}s"
Ciphertext = namedtuple("Ciphertext", ("key_id", "iv", "ciphertext", "tag"))
RESERVED_ALIASES = [
@ -134,11 +132,8 @@ def encrypt(master_keys, key_id, plaintext, encryption_context):
key = master_keys[key_id]
except KeyError:
is_alias = key_id.startswith("alias/") or ":alias/" in key_id
raise NotFoundException(
"{id_type} {key_id} is not found.".format(
id_type="Alias" if is_alias else "keyId", key_id=key_id
)
)
id_type = "Alias" if is_alias else "keyId"
raise NotFoundException(f"{id_type} {key_id} is not found.")
if plaintext == b"":
raise ValidationException(