Techdebt: Replace string-format with f-strings (for d* dirs) (#5662)

This commit is contained in:
Bert Blommers 2022-11-12 21:42:33 -01:00 committed by GitHub
parent b0b943949d
commit 52892f5481
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 79 additions and 153 deletions

View File

@ -7,7 +7,7 @@ class DataBrewClientError(JsonRESTError):
class AlreadyExistsException(DataBrewClientError):
def __init__(self, typ):
super().__init__("AlreadyExistsException", "%s already exists." % (typ))
super().__init__("AlreadyExistsException", f"{typ} already exists.")
class ConflictException(DataBrewClientError):
@ -41,7 +41,7 @@ class ResourceNotFoundException(DataBrewClientError):
class RulesetNotFoundException(EntityNotFoundException):
def __init__(self, recipe_name):
super().__init__("Ruleset %s not found." % recipe_name)
super().__init__(f"Ruleset {recipe_name} not found.")
class ServiceQuotaExceededException(JsonRESTError):

View File

@ -488,12 +488,12 @@ class FakeRecipeVersion(BaseModel):
"Name": self.name,
"Steps": self.steps,
"Description": self.description,
"CreateDate": "%.3f" % self.created_time.timestamp(),
"CreateDate": f"{self.created_time.timestamp():.3f}",
"Tags": self.tags or dict(),
"RecipeVersion": str(self.version),
}
if self.published_date is not None:
dict_recipe["PublishedDate"] = "%.3f" % self.published_date.timestamp()
dict_recipe["PublishedDate"] = f"{self.published_date.timestamp():.3f}"
return dict_recipe
@ -529,7 +529,7 @@ class FakeRuleset(BaseModel):
"Rules": self.rules,
"Description": self.description,
"TargetArn": self.target_arn,
"CreateDate": "%.3f" % self.created_time.timestamp(),
"CreateDate": f"{self.created_time.timestamp():.3f}",
"Tags": self.tags or dict(),
}
@ -569,7 +569,7 @@ class FakeDataset(BaseModel):
"FormatOptions": self.format_options,
"Input": self.input,
"PathOptions": self.path_options,
"CreateDate": "%.3f" % self.created_time.timestamp(),
"CreateDate": f"{self.created_time.timestamp():.3f}",
"Tags": self.tags or dict(),
"ResourceArn": self.resource_arn,
}
@ -631,7 +631,7 @@ class FakeJob(BaseModel, metaclass=BaseModelABCMeta):
rtn_dict = {
"Name": self.name,
"AccountId": self.account_id,
"CreateDate": "%.3f" % self.created_time.timestamp(),
"CreateDate": f"{self.created_time.timestamp():.3f}",
"DatasetName": self.dataset_name,
"EncryptionMode": self.encryption_mode,
"Tags": self.tags or dict(),

View File

@ -3,7 +3,7 @@ from moto.moto_api._internal import mock_random
def get_random_pipeline_id():
return "df-{0}".format(mock_random.get_random_hex(length=19))
return f"df-{mock_random.get_random_hex(length=19)}"
def remove_capitalization_of_dict_keys(obj):

View File

@ -13,9 +13,7 @@ class Location(BaseModel):
self.metadata = metadata
self.typ = typ
# Generate ARN
self.arn = "arn:aws:datasync:{0}:111222333444:location/loc-{1}".format(
region_name, str(arn_counter).zfill(17)
)
self.arn = f"arn:aws:datasync:{region_name}:111222333444:location/loc-{str(arn_counter).zfill(17)}"
class Task(BaseModel):
@ -36,9 +34,7 @@ class Task(BaseModel):
self.status = "AVAILABLE"
self.current_task_execution_arn = None
# Generate ARN
self.arn = "arn:aws:datasync:{0}:111222333444:task/task-{1}".format(
region_name, str(arn_counter).zfill(17)
)
self.arn = f"arn:aws:datasync:{region_name}:111222333444:task/task-{str(arn_counter).zfill(17)}"
class TaskExecution(BaseModel):
@ -63,7 +59,7 @@ class TaskExecution(BaseModel):
def __init__(self, task_arn, arn_counter=0):
self.task_arn = task_arn
self.arn = "{0}/execution/exec-{1}".format(task_arn, str(arn_counter).zfill(17))
self.arn = f"{task_arn}/execution/exec-{str(arn_counter).zfill(17)}"
self.status = self.TASK_EXECUTION_INTERMEDIATE_STATES[0]
# Simulate a task execution
@ -80,16 +76,12 @@ class TaskExecution(BaseModel):
else:
self.status = self.TASK_EXECUTION_SUCCESS_STATES[0]
return
raise Exception(
"TaskExecution.iterate_status: Unknown status={0}".format(self.status)
)
raise Exception(f"TaskExecution.iterate_status: Unknown status={self.status}")
def cancel(self):
if self.status not in self.TASK_EXECUTION_INTERMEDIATE_STATES:
raise InvalidRequestException(
"Sync task cannot be cancelled in its current status: {0}".format(
self.status
)
f"Sync task cannot be cancelled in its current status: {self.status}"
)
self.status = "ERROR"
@ -133,14 +125,10 @@ class DataSyncBackend(BaseBackend):
def _get_location(self, location_arn, typ):
if location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} is not found.".format(location_arn)
)
raise InvalidRequestException(f"Location {location_arn} is not found.")
location = self.locations[location_arn]
if location.typ != typ:
raise InvalidRequestException(
"Invalid Location type: {0}".format(location.typ)
)
raise InvalidRequestException(f"Invalid Location type: {location.typ}")
return location
def delete_location(self, location_arn):
@ -153,12 +141,10 @@ class DataSyncBackend(BaseBackend):
self, source_location_arn, destination_location_arn, name, metadata=None
):
if source_location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} not found.".format(source_location_arn)
)
raise InvalidRequestException(f"Location {source_location_arn} not found.")
if destination_location_arn not in self.locations:
raise InvalidRequestException(
"Location {0} not found.".format(destination_location_arn)
f"Location {destination_location_arn} not found."
)
self.arn_counter = self.arn_counter + 1
task = Task(
@ -184,9 +170,7 @@ class DataSyncBackend(BaseBackend):
task.name = name
task.metadata = metadata
else:
raise InvalidRequestException(
"Sync task {0} is not found.".format(task_arn)
)
raise InvalidRequestException(f"Sync task {task_arn} is not found.")
def delete_task(self, task_arn):
if task_arn in self.tasks:
@ -220,9 +204,7 @@ class DataSyncBackend(BaseBackend):
self.tasks[task_arn].current_task_execution_arn = None
self.tasks[task_arn].status = "AVAILABLE"
return
raise InvalidRequestException(
"Sync task {0} is not found.".format(task_execution_arn)
)
raise InvalidRequestException(f"Sync task {task_execution_arn} is not found.")
datasync_backends = BackendDict(DataSyncBackend, "datasync")

View File

@ -83,10 +83,10 @@ class Op(object):
self.rhs = rhs
def expr(self, item):
raise NotImplementedError("Expr not defined for {0}".format(type(self)))
raise NotImplementedError(f"Expr not defined for {type(self)}")
def __repr__(self):
return "({0} {1} {2})".format(self.lhs, self.OP, self.rhs)
return f"({self.lhs} {self.OP} {self.rhs})"
# TODO add tests for all of these
@ -276,11 +276,7 @@ class ConditionExpressionParser:
),
(
self.Nonterminal.OPERAND,
re.compile(
r"^{attribute_regex}(\.{attribute_regex}|\[[0-9]\])*".format(
attribute_regex=attribute_regex
)
),
re.compile(rf"^{attribute_regex}(\.{attribute_regex}|\[[0-9]\])*"),
),
(self.Nonterminal.COMMA, re.compile(r"^,")),
(self.Nonterminal.LEFT_PAREN, re.compile(r"^\(")),
@ -294,7 +290,7 @@ class ConditionExpressionParser:
break
else: # pragma: no cover
raise ValueError(
"Cannot parse condition starting at:{}".format(remaining_expression)
f"Cannot parse condition starting at:{remaining_expression}"
)
node = self.Node(
@ -327,7 +323,7 @@ class ConditionExpressionParser:
for child in children:
self._assert(
child.nonterminal == self.Nonterminal.IDENTIFIER,
"Cannot use {} in path".format(child.text),
f"Cannot use {child.text} in path",
[node],
)
output.append(
@ -401,7 +397,7 @@ class ConditionExpressionParser:
elif name.startswith("["):
# e.g. [123]
if not name.endswith("]"): # pragma: no cover
raise ValueError("Bad path element {}".format(name))
raise ValueError(f"Bad path element {name}")
return self.Node(
nonterminal=self.Nonterminal.IDENTIFIER,
kind=self.Kind.LITERAL,
@ -640,7 +636,7 @@ class ConditionExpressionParser:
for i in range(len(expected_kinds)):
self._assert(
arguments[i].kind in expected_kinds[i],
"Wrong type for argument %d in" % i,
f"Wrong type for argument {i} in",
all_children,
)
if function_name.value == "size":
@ -809,7 +805,7 @@ class ConditionExpressionParser:
arguments = [self._make_operand(arg) for arg in arguments]
return FUNC_CLASS[function_name](*arguments)
else: # pragma: no cover
raise ValueError("Unknown operand: %r" % node)
raise ValueError(f"Unknown operand: {node}")
def _make_op_condition(self, node):
if node.kind == self.Kind.OR:
@ -849,7 +845,7 @@ class ConditionExpressionParser:
self._make_operand(lhs), self._make_operand(rhs)
)
else: # pragma: no cover
raise ValueError("Unknown expression node kind %r" % node.kind)
raise ValueError(f"Unknown expression node kind {node.kind}")
def _assert(self, condition, message, nodes):
if not condition:
@ -969,7 +965,7 @@ class OpNot(Op):
return not lhs
def __str__(self):
return "({0} {1})".format(self.OP, self.lhs)
return f"({self.OP} {self.lhs})"
class OpAnd(Op):
@ -1072,9 +1068,7 @@ class Func(object):
raise NotImplementedError
def __repr__(self):
return "{0}({1})".format(
self.FUNC, " ".join([repr(arg) for arg in self.arguments])
)
return f"{self.FUNC}({' '.join([repr(arg) for arg in self.arguments])})"
class FuncAttrExists(Func):
@ -1150,7 +1144,7 @@ class FuncSize(Func):
def expr(self, item):
if self.attr.get_type(item) is None:
raise ValueError("Invalid attribute name {0}".format(self.attr))
raise ValueError(f"Invalid attribute name {self.attr}")
if self.attr.get_type(item) in ("S", "SS", "NS", "B", "BS", "L", "M"):
return len(self.attr.expr(item))

View File

@ -157,18 +157,14 @@ class ItemSizeToUpdateTooLarge(MockValidationException):
class HashKeyTooLong(MockValidationException):
# deliberately no space between of and {lim}
key_too_large_msg = "One or more parameter values were invalid: Size of hashkey has exceeded the maximum size limit of{lim} bytes".format(
lim=HASH_KEY_MAX_LENGTH
)
key_too_large_msg = f"One or more parameter values were invalid: Size of hashkey has exceeded the maximum size limit of{HASH_KEY_MAX_LENGTH} bytes"
def __init__(self):
super().__init__(self.key_too_large_msg)
class RangeKeyTooLong(MockValidationException):
key_too_large_msg = "One or more parameter values were invalid: Aggregated size of all range keys has exceeded the size limit of {lim} bytes".format(
lim=RANGE_KEY_MAX_LENGTH
)
key_too_large_msg = f"One or more parameter values were invalid: Aggregated size of all range keys has exceeded the size limit of {RANGE_KEY_MAX_LENGTH} bytes"
def __init__(self):
super().__init__(self.key_too_large_msg)
@ -285,26 +281,25 @@ class ResourceNotFoundException(JsonRESTError):
class TableNotFoundException(JsonRESTError):
def __init__(self, name):
err = "com.amazonaws.dynamodb.v20111205#TableNotFoundException"
msg = "Table not found: {}".format(name)
super().__init__(err, msg)
super().__init__(err, f"Table not found: {name}")
class SourceTableNotFoundException(JsonRESTError):
def __init__(self, source_table_name):
er = "com.amazonaws.dynamodb.v20111205#SourceTableNotFoundException"
super().__init__(er, "Source table not found: %s" % source_table_name)
super().__init__(er, f"Source table not found: {source_table_name}")
class BackupNotFoundException(JsonRESTError):
def __init__(self, backup_arn):
er = "com.amazonaws.dynamodb.v20111205#BackupNotFoundException"
super().__init__(er, "Backup not found: %s" % backup_arn)
super().__init__(er, f"Backup not found: {backup_arn}")
class TableAlreadyExistsException(JsonRESTError):
def __init__(self, target_table_name):
er = "com.amazonaws.dynamodb.v20111205#TableAlreadyExistsException"
super().__init__(er, "Table already exists: %s" % target_table_name)
super().__init__(er, f"Table already exists: {target_table_name}")
class ResourceInUseException(JsonRESTError):

View File

@ -97,7 +97,7 @@ class Item(BaseModel):
)
def __repr__(self):
return "Item: {0}".format(self.to_json())
return f"Item: {self.to_json()}"
def size(self):
return sum(bytesize(key) + value.size() for key, value in self.attrs.items())
@ -191,7 +191,7 @@ class Item(BaseModel):
)
else:
raise NotImplementedError(
"%s action not support for update_with_attribute_updates" % action
f"{action} action not support for update_with_attribute_updates"
)
# Filter using projection_expression
@ -814,9 +814,9 @@ class Table(CloudFormationModel):
all_indexes = self.all_indexes()
indexes_by_name = dict((i.name, i) for i in all_indexes)
if index_name not in indexes_by_name:
all_indexes = ", ".join(indexes_by_name.keys())
raise MockValidationException(
"Invalid index: %s for table: %s. Available indexes are: %s"
% (index_name, self.name, ", ".join(indexes_by_name.keys()))
f"Invalid index: {index_name} for table: {self.name}. Available indexes are: {all_indexes}"
)
index = indexes_by_name[index_name]
@ -826,7 +826,7 @@ class Table(CloudFormationModel):
][0]
except IndexError:
raise MockValidationException(
"Missing Hash Key. KeySchema: %s" % index.name
f"Missing Hash Key. KeySchema: {index.name}"
)
try:
@ -945,7 +945,7 @@ class Table(CloudFormationModel):
indexes_by_name = dict((i.name, i) for i in all_indexes)
if error_if_not and index_name not in indexes_by_name:
raise InvalidIndexNameError(
"The table does not have the specified index: %s" % index_name
f"The table does not have the specified index: {index_name}"
)
return indexes_by_name[index_name]
@ -1139,16 +1139,11 @@ class Backup(object):
timestamp_padded = str("0" + str(timestamp))[-16:16]
guid = str(mock_random.uuid4())
guid_shortened = guid[:8]
return "{}-{}".format(timestamp_padded, guid_shortened)
return f"{timestamp_padded}-{guid_shortened}"
@property
def arn(self):
return "arn:aws:dynamodb:{region}:{account}:table/{table_name}/backup/{identifier}".format(
region=self.backend.region_name,
account=self.backend.account_id,
table_name=self.table.name,
identifier=self.identifier,
)
return f"arn:aws:dynamodb:{self.backend.region_name}:{self.backend.account_id}:table/{self.table.name}/backup/{self.identifier}"
@property
def details(self):
@ -1227,7 +1222,7 @@ class DynamoDBBackend(BaseBackend):
def describe_endpoints(self):
return [
{
"Address": "dynamodb.{}.amazonaws.com".format(self.region_name),
"Address": f"dynamodb.{self.region_name}.amazonaws.com",
"CachePeriodInMinutes": 1440,
}
]
@ -1420,10 +1415,9 @@ class DynamoDBBackend(BaseBackend):
all_indexes = (table.global_indexes or []) + (table.indexes or [])
indexes_by_name = dict((i.name, i) for i in all_indexes)
if index_name not in indexes_by_name:
all_indexes = ", ".join(indexes_by_name.keys())
raise ResourceNotFoundException(
"Invalid index: {} for table: {}. Available indexes are: {}".format(
index_name, table_name, ", ".join(indexes_by_name.keys())
)
f"Invalid index: {index_name} for table: {table_name}. Available indexes are: {all_indexes}"
)
return indexes_by_name[index_name].schema

View File

@ -100,7 +100,7 @@ class DynamoType(object):
return self.cast_value >= other.cast_value
def __repr__(self):
return "DynamoType: {0}".format(self.to_json())
return f"DynamoType: {self.to_json()}"
def __add__(self, other):
if self.type != other.type:
@ -108,9 +108,7 @@ class DynamoType(object):
if self.is_number():
self_value = float(self.value) if "." in self.value else int(self.value)
other_value = float(other.value) if "." in other.value else int(other.value)
return DynamoType(
{DDBType.NUMBER: "{v}".format(v=self_value + other_value)}
)
return DynamoType({DDBType.NUMBER: f"{self_value + other_value}"})
else:
raise IncorrectDataType()
@ -120,9 +118,7 @@ class DynamoType(object):
if self.type == DDBType.NUMBER:
self_value = float(self.value) if "." in self.value else int(self.value)
other_value = float(other.value) if "." in other.value else int(other.value)
return DynamoType(
{DDBType.NUMBER: "{v}".format(v=self_value - other_value)}
)
return DynamoType({DDBType.NUMBER: f"{self_value - other_value}"})
else:
raise TypeError("Sum only supported for Numbers.")
@ -136,9 +132,7 @@ class DynamoType(object):
if self.type == DDBType.LIST:
return self.value[item]
raise TypeError(
"This DynamoType {dt} is not subscriptable by a {it}".format(
dt=self.type, it=type(item)
)
f"This DynamoType {self.type} is not subscriptable by a {type(item)}"
)
def __setitem__(self, key, value):
@ -153,7 +147,7 @@ class DynamoType(object):
if self.is_map():
self.value[key] = value
else:
raise NotImplementedError("No set_item for {t}".format(t=type(key)))
raise NotImplementedError(f"No set_item for {type(key)}")
@property
def cast_value(self):
@ -237,4 +231,4 @@ class DynamoType(object):
if self.is_map() or self.is_list():
self.value.pop(key, *args, **kwargs)
else:
raise TypeError("pop not supported for DynamoType {t}".format(t=self.type))
raise TypeError(f"pop not supported for DynamoType {self.type}")

View File

@ -129,7 +129,7 @@ class SetExecutor(NodeExecutor):
item_part_to_modify_with_set[attribute_name] = value_to_set
else:
raise NotImplementedError(
"Moto does not support setting {t} yet".format(t=type(element_to_set))
f"Moto does not support setting {type(element_to_set)} yet"
)
@ -173,7 +173,7 @@ class DeleteExecutor(NodeExecutor):
attribute_name = element.get_attribute_name()
else:
raise NotImplementedError(
"Moto does not support deleting {t} yet".format(t=type(element))
f"Moto does not support deleting {type(element)} yet"
)
container = self.get_item_before_end_of_path(item)
del container[attribute_name]
@ -203,9 +203,7 @@ class RemoveExecutor(NodeExecutor):
pass
else:
raise NotImplementedError(
"Moto does not support setting {t} yet".format(
t=type(element_to_remove)
)
f"Moto does not support setting {type(element_to_remove)} yet"
)

View File

@ -64,19 +64,15 @@ class NestableExpressionParserMixin(object):
Returns:
"""
pos = self.token_pos
fc = factory_class.__class__.__name__
logger.debug(
"Move token pos {pos} to continue parsing with specific factory class {fc}".format(
pos=self.token_pos, fc=factory_class.__class__.__name__
)
f"Move token pos {pos} to continue parsing with specific factory class {fc}"
)
# noinspection PyProtectedMember
ast, token_pos = factory_class(**self._initializer_args())._parse_with_pos()
self.target_clauses.append(ast)
logger.debug(
"Continue where previous parsing ended {token_pos}".format(
token_pos=token_pos
)
)
logger.debug(f"Continue where previous parsing ended {token_pos}")
self.token_pos = token_pos
@abstractmethod
@ -118,9 +114,8 @@ class NestableExpressionParserMixin(object):
Returns:
moto.dynamodb.ast_nodes.Node: Node of an AST representing the Expression as produced by the factory.
"""
assert len(self.target_clauses) > 0, "No nodes for {cn}".format(
cn=self.__class__.__name__
)
cn = self.__class__.__name__
assert len(self.target_clauses) > 0, f"No nodes for {cn}"
target_node = self._nestable_class()(children=[self.target_clauses.pop()])
while len(self.target_clauses) > 0:
target_node = self._nestable_class()(
@ -358,11 +353,7 @@ class NestableBinExpressionParser(ExpressionParser):
**self._initializer_args()
)._parse_with_pos()
self.target_nodes.append(ast)
logger.debug(
"Continue where previous parsing ended {token_pos}".format(
token_pos=self.token_pos
)
)
logger.debug(f"Continue where previous parsing ended {self.token_pos}")
def _parse(self):
self._parse_target_clause(self._operand_factory_class())
@ -525,11 +516,8 @@ class UpdateExpressionActionsParser(ExpressionParser, NestableExpressionParserMi
@classmethod
def _is_possible_start(cls, token):
raise RuntimeError(
"{class_name} cannot be identified by the next token.".format(
class_name=cls._nestable_class().__name__
)
)
cn = cls._nestable_class().__name__
raise RuntimeError(f"{cn} cannot be identified by the next token.")
@classmethod
@abstractmethod
@ -562,12 +550,9 @@ class UpdateExpressionActionsParser(ExpressionParser, NestableExpressionParserMi
break
if len(self.target_clauses) == 0:
logger.debug(
"Didn't encounter a single {nc} in {nepc}.".format(
nc=self._nestable_class().__name__,
nepc=self._nested_expression_parser_class().__name__,
)
)
nc = self._nestable_class().__name__
nepc = self._nested_expression_parser_class().__name__
logger.debug(f"Didn't encounter a single {nc} in {nepc}.")
self.raise_unexpected_token()
return self._create_node()

View File

@ -63,11 +63,9 @@ class Token(object):
def __repr__(self):
if isinstance(self.type, int):
return 'Token("{tt}", "{tv}")'.format(
tt=self.PLACEHOLDER_NAMES[self.type], tv=self.value
)
return f'Token("{self.PLACEHOLDER_NAMES[self.type]}", "{self.value}")'
else:
return 'Token("{tt}", "{tv}")'.format(tt=self.type, tv=self.value)
return f'Token("{self.type}", "{self.value}")'
def __eq__(self, other):
return self.type == other.type and self.value == other.value

View File

@ -112,9 +112,7 @@ class ExpressionPathResolver(object):
else:
raise InvalidUpdateExpressionInvalidDocumentPath
else:
raise NotImplementedError(
"Path resolution for {t}".format(t=type(child))
)
raise NotImplementedError(f"Path resolution for {type(child)}")
return DDBTypedValue(target)
def resolve_expression_path_nodes_to_dynamo_type(
@ -216,9 +214,7 @@ class UpdateExpressionFunctionEvaluator(DepthFirstTraverser):
first_arg.value.append(list_element)
return DDBTypedValue(first_arg)
else:
raise NotImplementedError(
"Unsupported function for moto {name}".format(name=function_name)
)
raise NotImplementedError(f"Unsupported function for moto {function_name}")
@classmethod
def get_list_from_ddb_typed_value(cls, node, function_name):
@ -270,11 +266,7 @@ class ExecuteOperations(DepthFirstTraverser):
elif operator == "-":
return self.get_subtraction(left_operand, right_operand)
else:
raise NotImplementedError(
"Moto does not support operator {operator}".format(
operator=operator
)
)
raise NotImplementedError(f"Moto does not support operator {operator}")
else:
raise NotImplementedError(
"UpdateExpressionValue only has implementations for 1 or 3 children."

View File

@ -28,9 +28,7 @@ def include_consumed_capacity(val=1.0):
expected_capacity = handler.body.get("ReturnConsumedCapacity", "NONE")
if expected_capacity not in ["NONE", "TOTAL", "INDEXES"]:
type_ = "ValidationException"
message = "1 validation error detected: Value '{}' at 'returnConsumedCapacity' failed to satisfy constraint: Member must satisfy enum value set: [INDEXES, TOTAL, NONE]".format(
expected_capacity
)
message = f"1 validation error detected: Value '{expected_capacity}' at 'returnConsumedCapacity' failed to satisfy constraint: Member must satisfy enum value set: [INDEXES, TOTAL, NONE]"
return (
400,
handler.response_headers,
@ -730,7 +728,7 @@ class DynamoHandler(BaseResponse):
projection_expression,
)
except ValueError as err:
raise MockValidationException("Bad Filter Expression: {0}".format(err))
raise MockValidationException(f"Bad Filter Expression: {err}")
result = {
"Count": len(items),

View File

@ -34,7 +34,7 @@ class DynamoType(object):
return self.type == other.type and self.value == other.value
def __repr__(self):
return "DynamoType: {0}".format(self.to_json())
return f"DynamoType: {self.to_json()}"
def add(self, dyn_type):
if self.type == "SS":
@ -66,7 +66,7 @@ class Item(BaseModel):
self.attrs[key] = DynamoType(value)
def __repr__(self):
return "Item: {0}".format(self.to_json())
return f"Item: {self.to_json()}"
def to_json(self):
attributes = {}

View File

@ -27,11 +27,7 @@ class ShardIterator(BaseModel):
@property
def arn(self):
return "{}/stream/{}|1|{}".format(
self.stream_shard.table.table_arn,
self.stream_shard.table.latest_stream_label,
self.id,
)
return f"{self.stream_shard.table.table_arn}/stream/{self.stream_shard.table.latest_stream_label}|1|{self.id}"
def to_json(self):
return {"ShardIterator": self.arn}