Techdebt: Replace string-format with f-strings (for m* dirs) (#5686)

This commit is contained in:
Bert Blommers 2022-11-19 23:12:31 -01:00 committed by GitHub
parent 2483111f71
commit 677457c1b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 97 additions and 192 deletions

View File

@ -35,9 +35,7 @@ class BadRequestException(ManagedBlockchainClientError):
def __init__(self, pretty_called_method, operation_error): def __init__(self, pretty_called_method, operation_error):
super().__init__( super().__init__(
"BadRequestException", "BadRequestException",
"An error occurred (BadRequestException) when calling the {0} operation: {1}".format( f"An error occurred (BadRequestException) when calling the {pretty_called_method} operation: {operation_error}",
pretty_called_method, operation_error
),
) )
@ -45,9 +43,7 @@ class InvalidRequestException(ManagedBlockchainClientError):
def __init__(self, pretty_called_method, operation_error): def __init__(self, pretty_called_method, operation_error):
super().__init__( super().__init__(
"InvalidRequestException", "InvalidRequestException",
"An error occurred (InvalidRequestException) when calling the {0} operation: {1}".format( f"An error occurred (InvalidRequestException) when calling the {pretty_called_method} operation: {operation_error}",
pretty_called_method, operation_error
),
) )
@ -56,9 +52,7 @@ class ResourceNotFoundException(ManagedBlockchainClientError):
self.code = 404 self.code = 404
super().__init__( super().__init__(
"ResourceNotFoundException", "ResourceNotFoundException",
"An error occurred (ResourceNotFoundException) when calling the {0} operation: {1}".format( f"An error occurred (ResourceNotFoundException) when calling the {pretty_called_method} operation: {operation_error}",
pretty_called_method, operation_error
),
) )
@ -67,9 +61,7 @@ class ResourceAlreadyExistsException(ManagedBlockchainClientError):
self.code = 409 self.code = 409
super().__init__( super().__init__(
"ResourceAlreadyExistsException", "ResourceAlreadyExistsException",
"An error occurred (ResourceAlreadyExistsException) when calling the {0} operation: {1}".format( f"An error occurred (ResourceAlreadyExistsException) when calling the {pretty_called_method} operation: {operation_error}",
pretty_called_method, operation_error
),
) )
@ -78,7 +70,5 @@ class ResourceLimitExceededException(ManagedBlockchainClientError):
self.code = 429 self.code = 429
super().__init__( super().__init__(
"ResourceLimitExceededException", "ResourceLimitExceededException",
"An error occurred (ResourceLimitExceededException) when calling the {0} operation: {1}".format( f"An error occurred (ResourceLimitExceededException) when calling the {pretty_called_method} operation: {operation_error}",
pretty_called_method, operation_error
),
) )

View File

@ -128,15 +128,13 @@ class ManagedBlockchainNetwork(BaseModel):
# Format for get_network # Format for get_network
frameworkattributes = { frameworkattributes = {
"Fabric": { "Fabric": {
"OrderingServiceEndpoint": "orderer.{0}.managedblockchain.{1}.amazonaws.com:30001".format( "OrderingServiceEndpoint": f"orderer.{self.id.lower()}.managedblockchain.{self.region}.amazonaws.com:30001",
self.id.lower(), self.region
),
"Edition": self.frameworkconfiguration["Fabric"]["Edition"], "Edition": self.frameworkconfiguration["Fabric"]["Edition"],
} }
} }
vpcendpointname = "com.amazonaws.{0}.managedblockchain.{1}".format( vpcendpointname = (
self.region, self.id.lower() f"com.amazonaws.{self.region}.managedblockchain.{self.id.lower()}"
) )
d = { d = {
@ -390,9 +388,7 @@ class ManagedBlockchainMember(BaseModel):
"AdminUsername": self.member_configuration["FrameworkConfiguration"][ "AdminUsername": self.member_configuration["FrameworkConfiguration"][
"Fabric" "Fabric"
]["AdminUsername"], ]["AdminUsername"],
"CaEndpoint": "ca.{0}.{1}.managedblockchain.{2}.amazonaws.com:30002".format( "CaEndpoint": f"ca.{self.id.lower()}.{self.networkid.lower()}.managedblockchain.{self.region}.amazonaws.com:30002",
self.id.lower(), self.networkid.lower(), self.region
),
} }
} }
@ -464,18 +460,8 @@ class ManagedBlockchainNode(BaseModel):
# Format for get_node # Format for get_node
frameworkattributes = { frameworkattributes = {
"Fabric": { "Fabric": {
"PeerEndpoint": "{0}.{1}.{2}.managedblockchain.{3}.amazonaws.com:30003".format( "PeerEndpoint": f"{self.id.lower()}.{self.networkid.lower()}.{self.memberid.lower()}.managedblockchain.{self.region}.amazonaws.com:30003",
self.id.lower(), "PeerEventEndpoint": f"{self.id.lower()}.{self.networkid.lower()}.{self.memberid.lower()}.managedblockchain.{self.region}.amazonaws.com:30004",
self.networkid.lower(),
self.memberid.lower(),
self.region,
),
"PeerEventEndpoint": "{0}.{1}.{2}.managedblockchain.{3}.amazonaws.com:30004".format(
self.id.lower(),
self.networkid.lower(),
self.memberid.lower(),
self.region,
),
} }
} }
@ -526,9 +512,7 @@ class ManagedBlockchainBackend(BaseBackend):
if frameworkversion not in FRAMEWORKVERSIONS: if frameworkversion not in FRAMEWORKVERSIONS:
raise BadRequestException( raise BadRequestException(
"CreateNetwork", "CreateNetwork",
"Invalid version {0} requested for framework HYPERLEDGER_FABRIC".format( f"Invalid version {frameworkversion} requested for framework HYPERLEDGER_FABRIC",
frameworkversion
),
) )
# Check edition # Check edition
@ -569,7 +553,7 @@ class ManagedBlockchainBackend(BaseBackend):
def get_network(self, network_id): def get_network(self, network_id):
if network_id not in self.networks: if network_id not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"GetNetwork", "Network {0} not found.".format(network_id) "GetNetwork", f"Network {network_id} not found."
) )
return self.networks.get(network_id) return self.networks.get(network_id)
@ -577,13 +561,13 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"CreateProposal", "Network {0} not found.".format(networkid) "CreateProposal", f"Network {networkid} not found."
) )
# Check if member exists # Check if member exists
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"CreateProposal", "Member {0} not found.".format(memberid) "CreateProposal", f"Member {memberid} not found."
) )
# CLI docs say that Invitations and Removals cannot both be passed - but it does # CLI docs say that Invitations and Removals cannot both be passed - but it does
@ -632,7 +616,7 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"ListProposals", "Network {0} not found.".format(networkid) "ListProposals", f"Network {networkid} not found."
) )
proposalsfornetwork = [] proposalsfornetwork = []
@ -647,12 +631,12 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"GetProposal", "Network {0} not found.".format(networkid) "GetProposal", f"Network {networkid} not found."
) )
if proposalid not in self.proposals: if proposalid not in self.proposals:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"GetProposal", "Proposal {0} not found.".format(proposalid) "GetProposal", f"Proposal {proposalid} not found."
) )
# See if it needs to be set to expipred # See if it needs to be set to expipred
@ -663,17 +647,17 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"VoteOnProposal", "Network {0} not found.".format(networkid) "VoteOnProposal", f"Network {networkid} not found."
) )
if proposalid not in self.proposals: if proposalid not in self.proposals:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"VoteOnProposal", "Proposal {0} not found.".format(proposalid) "VoteOnProposal", f"Proposal {proposalid} not found."
) )
if votermemberid not in self.members: if votermemberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"VoteOnProposal", "Member {0} not found.".format(votermemberid) "VoteOnProposal", f"Member {votermemberid} not found."
) )
if vote.upper() not in VOTEVALUES: if vote.upper() not in VOTEVALUES:
@ -686,25 +670,21 @@ class ManagedBlockchainBackend(BaseBackend):
if self.proposals.get(proposalid).proposal_status == "EXPIRED": if self.proposals.get(proposalid).proposal_status == "EXPIRED":
raise InvalidRequestException( raise InvalidRequestException(
"VoteOnProposal", "VoteOnProposal",
"Proposal {0} is expired and you cannot vote on it.".format(proposalid), f"Proposal {proposalid} is expired and you cannot vote on it.",
) )
# Check if IN_PROGRESS # Check if IN_PROGRESS
if self.proposals.get(proposalid).proposal_status != "IN_PROGRESS": if self.proposals.get(proposalid).proposal_status != "IN_PROGRESS":
raise InvalidRequestException( raise InvalidRequestException(
"VoteOnProposal", "VoteOnProposal",
"Proposal {0} has status {1} and you cannot vote on it.".format( f"Proposal {proposalid} has status {self.proposals.get(proposalid).proposal_status} and you cannot vote on it.",
proposalid, self.proposals.get(proposalid).proposal_status
),
) )
# Check to see if this member already voted # Check to see if this member already voted
if votermemberid in self.proposals.get(proposalid).proposal_votes: if votermemberid in self.proposals.get(proposalid).proposal_votes:
raise ResourceAlreadyExistsException( raise ResourceAlreadyExistsException(
"VoteOnProposal", "VoteOnProposal",
"Member {0} has already voted on proposal {1}.".format( f"Member {votermemberid} has already voted on proposal {proposalid}.",
votermemberid, proposalid
),
) )
# Cast vote # Cast vote
@ -741,12 +721,12 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"ListProposalVotes", "Network {0} not found.".format(networkid) "ListProposalVotes", f"Network {networkid} not found."
) )
if proposalid not in self.proposals: if proposalid not in self.proposals:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"ListProposalVotes", "Proposal {0} not found.".format(proposalid) "ListProposalVotes", f"Proposal {proposalid} not found."
) )
# Output the vote summaries # Output the vote summaries
@ -765,7 +745,7 @@ class ManagedBlockchainBackend(BaseBackend):
def reject_invitation(self, invitationid): def reject_invitation(self, invitationid):
if invitationid not in self.invitations: if invitationid not in self.invitations:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"RejectInvitation", "InvitationId {0} not found.".format(invitationid) "RejectInvitation", f"InvitationId {invitationid} not found."
) )
self.invitations.get(invitationid).reject_invitation() self.invitations.get(invitationid).reject_invitation()
@ -773,30 +753,25 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"CreateMember", "Network {0} not found.".format(networkid) "CreateMember", f"Network {networkid} not found."
) )
if invitationid not in self.invitations: if invitationid not in self.invitations:
raise InvalidRequestException( raise InvalidRequestException(
"CreateMember", "Invitation {0} not valid".format(invitationid) "CreateMember", f"Invitation {invitationid} not valid"
) )
if self.invitations.get(invitationid).invitation_status != "PENDING": if self.invitations.get(invitationid).invitation_status != "PENDING":
raise InvalidRequestException( raise InvalidRequestException(
"CreateMember", "Invitation {0} not valid".format(invitationid) "CreateMember", f"Invitation {invitationid} not valid"
) )
if ( if member_name_exist_in_network(
member_name_exist_in_network( self.members, networkid, member_configuration["Name"]
self.members, networkid, member_configuration["Name"]
)
is True
): ):
raise InvalidRequestException( raise InvalidRequestException(
"CreateMember", "CreateMember",
"Member name {0} already exists in network {1}.".format( f"Member name {member_configuration['Name']} already exists in network {networkid}.",
member_configuration["Name"], networkid
),
) )
networkedition = self.networks.get(networkid).network_edition networkedition = self.networks.get(networkid).network_edition
@ -806,9 +781,7 @@ class ManagedBlockchainBackend(BaseBackend):
): ):
raise ResourceLimitExceededException( raise ResourceLimitExceededException(
"CreateMember", "CreateMember",
"You cannot create a member in network {0}.{1} is the maximum number of members allowed in a {2} Edition network.".format( f"You cannot create a member in network {networkid}.{EDITIONS[networkedition]['MaxMembers']} is the maximum number of members allowed in a {networkedition} Edition network.",
networkid, EDITIONS[networkedition]["MaxMembers"], networkedition
),
) )
memberadminpassword = member_configuration["FrameworkConfiguration"]["Fabric"][ memberadminpassword = member_configuration["FrameworkConfiguration"]["Fabric"][
@ -836,7 +809,7 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"ListMembers", "Network {0} not found.".format(networkid) "ListMembers", f"Network {networkid} not found."
) )
membersfornetwork = [] membersfornetwork = []
@ -849,18 +822,18 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"GetMember", "Network {0} not found.".format(networkid) "GetMember", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"GetMember", "Member {0} not found.".format(memberid) "GetMember", f"Member {memberid} not found."
) )
# Cannot get a member than has been deleted (it does show up in the list) # Cannot get a member than has been deleted (it does show up in the list)
if self.members.get(memberid).member_status == "DELETED": if self.members.get(memberid).member_status == "DELETED":
raise ResourceNotFoundException( raise ResourceNotFoundException(
"GetMember", "Member {0} not found.".format(memberid) "GetMember", f"Member {memberid} not found."
) )
return self.members.get(memberid) return self.members.get(memberid)
@ -869,12 +842,12 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"DeleteMember", "Network {0} not found.".format(networkid) "DeleteMember", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"DeleteMember", "Member {0} not found.".format(memberid) "DeleteMember", f"Member {memberid} not found."
) )
self.members.get(memberid).delete() self.members.get(memberid).delete()
@ -902,12 +875,12 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"UpdateMember", "Network {0} not found.".format(networkid) "UpdateMember", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"UpdateMember", "Member {0} not found.".format(memberid) "UpdateMember", f"Member {memberid} not found."
) )
self.members.get(memberid).update(logpublishingconfiguration) self.members.get(memberid).update(logpublishingconfiguration)
@ -923,12 +896,12 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"CreateNode", "Network {0} not found.".format(networkid) "CreateNode", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"CreateNode", "Member {0} not found.".format(memberid) "CreateNode", f"Member {memberid} not found."
) )
networkedition = self.networks.get(networkid).network_edition networkedition = self.networks.get(networkid).network_edition
@ -938,11 +911,7 @@ class ManagedBlockchainBackend(BaseBackend):
): ):
raise ResourceLimitExceededException( raise ResourceLimitExceededException(
"CreateNode", "CreateNode",
"Maximum number of nodes exceeded in member {0}. The maximum number of nodes you can have in a member in a {1} Edition network is {2}".format( f"Maximum number of nodes exceeded in member {memberid}. The maximum number of nodes you can have in a member in a {networkedition} Edition network is {EDITIONS[networkedition]['MaxNodesPerMember']}",
memberid,
networkedition,
EDITIONS[networkedition]["MaxNodesPerMember"],
),
) )
# See if the instance family is correct # See if the instance family is correct
@ -956,7 +925,7 @@ class ManagedBlockchainBackend(BaseBackend):
if correctinstancefamily is False: if correctinstancefamily is False:
raise InvalidRequestException( raise InvalidRequestException(
"CreateNode", "CreateNode",
"Requested instance {0} isn't supported.".format(instancetype), f"Requested instance {instancetype} isn't supported.",
) )
# Check for specific types for starter # Check for specific types for starter
@ -964,9 +933,7 @@ class ManagedBlockchainBackend(BaseBackend):
if instancetype not in EDITIONS["STARTER"]["AllowedNodeInstanceTypes"]: if instancetype not in EDITIONS["STARTER"]["AllowedNodeInstanceTypes"]:
raise InvalidRequestException( raise InvalidRequestException(
"CreateNode", "CreateNode",
"Instance type {0} is not supported with STARTER Edition networks.".format( f"Instance type {instancetype} is not supported with STARTER Edition networks.",
instancetype
),
) )
# Simple availability zone check # Simple availability zone check
@ -994,18 +961,18 @@ class ManagedBlockchainBackend(BaseBackend):
def list_nodes(self, networkid, memberid, status=None): def list_nodes(self, networkid, memberid, status=None):
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"ListNodes", "Network {0} not found.".format(networkid) "ListNodes", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"ListNodes", "Member {0} not found.".format(memberid) "ListNodes", f"Member {memberid} not found."
) )
# If member is deleted, cannot list nodes # If member is deleted, cannot list nodes
if self.members.get(memberid).member_status == "DELETED": if self.members.get(memberid).member_status == "DELETED":
raise ResourceNotFoundException( raise ResourceNotFoundException(
"ListNodes", "Member {0} not found.".format(memberid) "ListNodes", f"Member {memberid} not found."
) )
nodesformember = [] nodesformember = []
@ -1020,24 +987,18 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"GetNode", "Network {0} not found.".format(networkid) "GetNode", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException("GetNode", f"Member {memberid} not found.")
"GetNode", "Member {0} not found.".format(memberid)
)
if nodeid not in self.nodes: if nodeid not in self.nodes:
raise ResourceNotFoundException( raise ResourceNotFoundException("GetNode", f"Node {nodeid} not found.")
"GetNode", "Node {0} not found.".format(nodeid)
)
# Cannot get a node than has been deleted (it does show up in the list) # Cannot get a node than has been deleted (it does show up in the list)
if self.nodes.get(nodeid).node_status == "DELETED": if self.nodes.get(nodeid).node_status == "DELETED":
raise ResourceNotFoundException( raise ResourceNotFoundException("GetNode", f"Node {nodeid} not found.")
"GetNode", "Node {0} not found.".format(nodeid)
)
return self.nodes.get(nodeid) return self.nodes.get(nodeid)
@ -1045,18 +1006,16 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"DeleteNode", "Network {0} not found.".format(networkid) "DeleteNode", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"DeleteNode", "Member {0} not found.".format(memberid) "DeleteNode", f"Member {memberid} not found."
) )
if nodeid not in self.nodes: if nodeid not in self.nodes:
raise ResourceNotFoundException( raise ResourceNotFoundException("DeleteNode", f"Node {nodeid} not found.")
"DeleteNode", "Node {0} not found.".format(nodeid)
)
self.nodes.get(nodeid).delete() self.nodes.get(nodeid).delete()
@ -1064,18 +1023,16 @@ class ManagedBlockchainBackend(BaseBackend):
# Check if network exists # Check if network exists
if networkid not in self.networks: if networkid not in self.networks:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"UpdateNode", "Network {0} not found.".format(networkid) "UpdateNode", f"Network {networkid} not found."
) )
if memberid not in self.members: if memberid not in self.members:
raise ResourceNotFoundException( raise ResourceNotFoundException(
"UpdateNode", "Member {0} not found.".format(memberid) "UpdateNode", f"Member {memberid} not found."
) )
if nodeid not in self.nodes: if nodeid not in self.nodes:
raise ResourceNotFoundException( raise ResourceNotFoundException("UpdateNode", f"Node {nodeid} not found.")
"UpdateNode", "Node {0} not found.".format(nodeid)
)
self.nodes.get(nodeid).update(logpublishingconfiguration) self.nodes.get(nodeid).update(logpublishingconfiguration)

View File

@ -215,9 +215,7 @@ class MediaConnectBackend(BaseBackend):
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
flow.vpc_interfaces = vpc_interfaces flow.vpc_interfaces = vpc_interfaces
else: else:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
return flow_arn, flow.vpc_interfaces return flow_arn, flow.vpc_interfaces
def add_flow_outputs(self, flow_arn, outputs): def add_flow_outputs(self, flow_arn, outputs):
@ -225,9 +223,7 @@ class MediaConnectBackend(BaseBackend):
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
flow.outputs = outputs flow.outputs = outputs
else: else:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
return flow_arn, flow.outputs return flow_arn, flow.outputs
def remove_flow_vpc_interface(self, flow_arn, vpc_interface_name): def remove_flow_vpc_interface(self, flow_arn, vpc_interface_name):
@ -239,9 +235,7 @@ class MediaConnectBackend(BaseBackend):
if vpc_interface["name"] != vpc_interface_name if vpc_interface["name"] != vpc_interface_name
] ]
else: else:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
return flow_arn, vpc_interface_name return flow_arn, vpc_interface_name
def remove_flow_output(self, flow_arn, output_name): def remove_flow_output(self, flow_arn, output_name):
@ -253,9 +247,7 @@ class MediaConnectBackend(BaseBackend):
if output["name"] != output_name if output["name"] != output_name
] ]
else: else:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
return flow_arn, output_name return flow_arn, output_name
def update_flow_output( def update_flow_output(
@ -279,9 +271,7 @@ class MediaConnectBackend(BaseBackend):
vpc_interface_attachment, vpc_interface_attachment,
): ):
if flow_arn not in self._flows: if flow_arn not in self._flows:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
for output in flow.outputs: for output in flow.outputs:
if output["outputArn"] == output_arn: if output["outputArn"] == output_arn:
@ -303,15 +293,11 @@ class MediaConnectBackend(BaseBackend):
output["streamId"] = stream_id output["streamId"] = stream_id
output["vpcInterfaceAttachment"] = vpc_interface_attachment output["vpcInterfaceAttachment"] = vpc_interface_attachment
return flow_arn, output return flow_arn, output
raise NotFoundException( raise NotFoundException(message=f"output with arn={output_arn} not found")
message="output with arn={} not found".format(output_arn)
)
def add_flow_sources(self, flow_arn, sources): def add_flow_sources(self, flow_arn, sources):
if flow_arn not in self._flows: if flow_arn not in self._flows:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
for source in sources: for source in sources:
source_id = random.uuid4().hex source_id = random.uuid4().hex
@ -342,9 +328,7 @@ class MediaConnectBackend(BaseBackend):
whitelist_cidr, whitelist_cidr,
): ):
if flow_arn not in self._flows: if flow_arn not in self._flows:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
source = next( source = next(
iter( iter(
@ -378,9 +362,7 @@ class MediaConnectBackend(BaseBackend):
entitlements, entitlements,
): ):
if flow_arn not in self._flows: if flow_arn not in self._flows:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
for entitlement in entitlements: for entitlement in entitlements:
entitlement_id = random.uuid4().hex entitlement_id = random.uuid4().hex
@ -393,16 +375,14 @@ class MediaConnectBackend(BaseBackend):
def revoke_flow_entitlement(self, flow_arn, entitlement_arn): def revoke_flow_entitlement(self, flow_arn, entitlement_arn):
if flow_arn not in self._flows: if flow_arn not in self._flows:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
for entitlement in flow.entitlements: for entitlement in flow.entitlements:
if entitlement_arn == entitlement["entitlementArn"]: if entitlement_arn == entitlement["entitlementArn"]:
flow.entitlements.remove(entitlement) flow.entitlements.remove(entitlement)
return flow_arn, entitlement_arn return flow_arn, entitlement_arn
raise NotFoundException( raise NotFoundException(
message="entitlement with arn={} not found".format(entitlement_arn) message=f"entitlement with arn={entitlement_arn} not found"
) )
def update_flow_entitlement( def update_flow_entitlement(
@ -416,9 +396,7 @@ class MediaConnectBackend(BaseBackend):
subscribers, subscribers,
): ):
if flow_arn not in self._flows: if flow_arn not in self._flows:
raise NotFoundException( raise NotFoundException(message=f"flow with arn={flow_arn} not found")
message="flow with arn={} not found".format(flow_arn)
)
flow = self._flows[flow_arn] flow = self._flows[flow_arn]
for entitlement in flow.entitlements: for entitlement in flow.entitlements:
if entitlement_arn == entitlement["entitlementArn"]: if entitlement_arn == entitlement["entitlementArn"]:
@ -429,7 +407,7 @@ class MediaConnectBackend(BaseBackend):
entitlement["subscribers"] = subscribers entitlement["subscribers"] = subscribers
return flow_arn, entitlement return flow_arn, entitlement
raise NotFoundException( raise NotFoundException(
message="entitlement with arn={} not found".format(entitlement_arn) message=f"entitlement with arn={entitlement_arn} not found"
) )
# add methods from here # add methods from here

View File

@ -134,7 +134,7 @@ class MediaLiveBackend(BaseBackend):
The RequestID and Reserved parameters are not yet implemented The RequestID and Reserved parameters are not yet implemented
""" """
channel_id = mock_random.uuid4().hex channel_id = mock_random.uuid4().hex
arn = "arn:aws:medialive:channel:{}".format(channel_id) arn = f"arn:aws:medialive:channel:{channel_id}"
channel = Channel( channel = Channel(
arn=arn, arn=arn,
cdi_input_specification=cdi_input_specification, cdi_input_specification=cdi_input_specification,
@ -228,7 +228,7 @@ class MediaLiveBackend(BaseBackend):
The VPC and RequestId parameters are not yet implemented The VPC and RequestId parameters are not yet implemented
""" """
input_id = mock_random.uuid4().hex input_id = mock_random.uuid4().hex
arn = "arn:aws:medialive:input:{}".format(input_id) arn = f"arn:aws:medialive:input:{input_id}"
a_input = Input( a_input = Input(
arn=arn, arn=arn,
input_id=input_id, input_id=input_id,

View File

@ -73,7 +73,7 @@ class MediaPackageBackend(BaseBackend):
self._origin_endpoints = OrderedDict() self._origin_endpoints = OrderedDict()
def create_channel(self, description, channel_id, tags): def create_channel(self, description, channel_id, tags):
arn = "arn:aws:mediapackage:channel:{}".format(channel_id) arn = f"arn:aws:mediapackage:channel:{channel_id}"
channel = Channel( channel = Channel(
arn=arn, arn=arn,
description=description, description=description,
@ -97,7 +97,7 @@ class MediaPackageBackend(BaseBackend):
return channel.to_dict() return channel.to_dict()
except KeyError: except KeyError:
error = "NotFoundException" error = "NotFoundException"
raise ClientError(error, "channel with id={} not found".format(channel_id)) raise ClientError(error, f"channel with id={channel_id} not found")
def delete_channel(self, channel_id): def delete_channel(self, channel_id):
try: try:
@ -107,7 +107,7 @@ class MediaPackageBackend(BaseBackend):
except KeyError: except KeyError:
error = "NotFoundException" error = "NotFoundException"
raise ClientError(error, "channel with id={} not found".format(channel_id)) raise ClientError(error, f"channel with id={channel_id} not found")
def create_origin_endpoint( def create_origin_endpoint(
self, self,
@ -126,10 +126,8 @@ class MediaPackageBackend(BaseBackend):
time_delay_seconds, time_delay_seconds,
whitelist, whitelist,
): ):
arn = "arn:aws:mediapackage:origin_endpoint:{}".format(endpoint_id) arn = f"arn:aws:mediapackage:origin_endpoint:{endpoint_id}"
url = "https://origin-endpoint.mediapackage.{}.amazonaws.com/{}".format( url = f"https://origin-endpoint.mediapackage.{self.region_name}.amazonaws.com/{endpoint_id}"
self.region_name, endpoint_id
)
origin_endpoint = OriginEndpoint( origin_endpoint = OriginEndpoint(
arn=arn, arn=arn,
authorization=authorization, authorization=authorization,
@ -157,9 +155,7 @@ class MediaPackageBackend(BaseBackend):
return origin_endpoint.to_dict() return origin_endpoint.to_dict()
except KeyError: except KeyError:
error = "NotFoundException" error = "NotFoundException"
raise ClientError( raise ClientError(error, f"origin endpoint with id={endpoint_id} not found")
error, "origin endpoint with id={} not found".format(endpoint_id)
)
def list_origin_endpoints(self): def list_origin_endpoints(self):
origin_endpoints = list(self._origin_endpoints.values()) origin_endpoints = list(self._origin_endpoints.values())
@ -173,9 +169,7 @@ class MediaPackageBackend(BaseBackend):
return origin_endpoint.to_dict() return origin_endpoint.to_dict()
except KeyError: except KeyError:
error = "NotFoundException" error = "NotFoundException"
raise ClientError( raise ClientError(error, f"origin endpoint with id={endpoint_id} not found")
error, "origin endpoint with id={} not found".format(endpoint_id)
)
def update_origin_endpoint( def update_origin_endpoint(
self, self,
@ -209,9 +203,7 @@ class MediaPackageBackend(BaseBackend):
except KeyError: except KeyError:
error = "NotFoundException" error = "NotFoundException"
raise ClientError( raise ClientError(error, f"origin endpoint with id={endpoint_id} not found")
error, "origin endpoint with id={} not found".format(endpoint_id)
)
mediapackage_backends = BackendDict(MediaPackageBackend, "mediapackage") mediapackage_backends = BackendDict(MediaPackageBackend, "mediapackage")

View File

@ -42,11 +42,11 @@ class MediaStoreBackend(BaseBackend):
self._containers = OrderedDict() self._containers = OrderedDict()
def create_container(self, name, tags): def create_container(self, name, tags):
arn = "arn:aws:mediastore:container:{}".format(name) arn = f"arn:aws:mediastore:container:{name}"
container = Container( container = Container(
arn=arn, arn=arn,
name=name, name=name,
endpoint="/{}".format(name), endpoint=f"/{name}",
status="CREATING", status="CREATING",
creation_time=date.today().strftime("%m/%d/%Y, %H:%M:%S"), creation_time=date.today().strftime("%m/%d/%Y, %H:%M:%S"),
tags=tags, tags=tags,

View File

@ -45,7 +45,7 @@ class MediaStoreDataBackend(BaseBackend):
def delete_object(self, path): def delete_object(self, path):
if path not in self._objects: if path not in self._objects:
error = "ObjectNotFoundException" error = "ObjectNotFoundException"
raise ClientError(error, "Object with id={} not found".format(path)) raise ClientError(error, f"Object with id={path} not found")
del self._objects[path] del self._objects[path]
return {} return {}
@ -56,7 +56,7 @@ class MediaStoreDataBackend(BaseBackend):
objects_found = [item for item in self._objects.values() if item.path == path] objects_found = [item for item in self._objects.values() if item.path == path]
if len(objects_found) == 0: if len(objects_found) == 0:
error = "ObjectNotFoundException" error = "ObjectNotFoundException"
raise ClientError(error, "Object with id={} not found".format(path)) raise ClientError(error, f"Object with id={path} not found")
return objects_found[0] return objects_found[0]
def list_items(self): def list_items(self):

View File

@ -23,10 +23,8 @@ class AWSTestHelper(FlaskClient):
opts = {"Action": action_name} opts = {"Action": action_name}
opts.update(kwargs) opts.update(kwargs)
res = self.get( res = self.get(
"/?{0}".format(urlencode(opts)), f"/?{urlencode(opts)}",
headers={ headers={"Host": f"{self.application.service}.us-east-1.amazonaws.com"},
"Host": "{0}.us-east-1.amazonaws.com".format(self.application.service)
},
) )
return res.data.decode("utf-8") return res.data.decode("utf-8")

View File

@ -74,15 +74,13 @@ class DomainDispatcherApplication(object):
return host return host
for backend, pattern in self.backend_url_patterns: for backend, pattern in self.backend_url_patterns:
if pattern.match("http://%s" % host): if pattern.match(f"http://{host}"):
return backend return backend
if "amazonaws.com" in host: if "amazonaws.com" in host:
print( # noqa print( # noqa
"Unable to find appropriate backend for {}." f"Unable to find appropriate backend for {host}."
"Remember to add the URL to urls.py, and run scripts/update_backend_index.py to index it.".format( "Remember to add the URL to urls.py, and run scripts/update_backend_index.py to index it."
host
)
) )
def infer_service_region_host(self, body, environ): def infer_service_region_host(self, body, environ):
@ -129,9 +127,7 @@ class DomainDispatcherApplication(object):
elif service == "mediastore" and not target: elif service == "mediastore" and not target:
# All MediaStore API calls have a target header # All MediaStore API calls have a target header
# If no target is set, assume we're trying to reach the mediastore-data service # If no target is set, assume we're trying to reach the mediastore-data service
host = "data.{service}.{region}.amazonaws.com".format( host = f"data.{service}.{region}.amazonaws.com"
service=service, region=region
)
elif service == "dynamodb": elif service == "dynamodb":
if environ["HTTP_X_AMZ_TARGET"].startswith("DynamoDBStreams"): if environ["HTTP_X_AMZ_TARGET"].startswith("DynamoDBStreams"):
host = "dynamodbstreams" host = "dynamodbstreams"
@ -145,21 +141,15 @@ class DomainDispatcherApplication(object):
else: else:
host = "dynamodb" host = "dynamodb"
elif service == "sagemaker": elif service == "sagemaker":
host = "api.{service}.{region}.amazonaws.com".format( host = f"api.{service}.{region}.amazonaws.com"
service=service, region=region
)
elif service == "timestream": elif service == "timestream":
host = "ingest.{service}.{region}.amazonaws.com".format( host = f"ingest.{service}.{region}.amazonaws.com"
service=service, region=region
)
elif service == "s3" and ( elif service == "s3" and (
path.startswith("/v20180820/") or "s3-control" in environ["HTTP_HOST"] path.startswith("/v20180820/") or "s3-control" in environ["HTTP_HOST"]
): ):
host = "s3control" host = "s3control"
else: else:
host = "{service}.{region}.amazonaws.com".format( host = f"{service}.{region}.amazonaws.com"
service=service, region=region
)
return host return host
@ -281,7 +271,7 @@ def create_backend_app(service):
for url_path, handler in backend.flask_paths.items(): for url_path, handler in backend.flask_paths.items():
view_func = convert_to_flask_response(handler) view_func = convert_to_flask_response(handler)
if handler.__name__ == "dispatch": if handler.__name__ == "dispatch":
endpoint = "{0}.dispatch".format(handler.__self__.__name__) endpoint = f"{handler.__self__.__name__}.dispatch"
else: else:
endpoint = view_func.__name__ endpoint = view_func.__name__