BUG: fix lint error E721 (#6767)

Co-authored-by: 渡邊 美希 <miki.watanabe@watanabes-MacBook-Pro.local>
Co-authored-by: Bert Blommers <info@bertblommers.nl>
This commit is contained in:
Miki Watanabe 2023-09-06 22:08:35 +09:00 committed by GitHub
parent ac7d89c120
commit 5b2da11e19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 59 additions and 70 deletions

View File

@ -187,7 +187,7 @@ class DynamoType(object):
value_size = sum(
[bytesize(k) + DynamoType(v).size() for k, v in self.value.items()]
)
elif type(self.value) == bool:
elif isinstance(self.value, bool):
value_size = 1
else:
value_size = bytesize(self.value)

View File

@ -494,18 +494,18 @@ class Table(CloudFormationModel):
self, item_attrs: Dict[str, Any], attr: Optional[str] = None
) -> None:
for key, value in item_attrs.items():
if type(value) == dict:
if isinstance(value, dict):
self._validate_item_types(value, attr=key if attr is None else key)
elif type(value) == int and key == "N":
elif isinstance(value, int) and key == "N":
raise InvalidConversion
if key == "S":
# This scenario is usually caught by boto3, but the user can disable parameter validation
# Which is why we need to catch it 'server-side' as well
if type(value) == int:
if isinstance(value, int):
raise SerializationException(
"NUMBER_VALUE cannot be converted to String"
)
if attr and attr in self.table_key_attrs and type(value) == dict:
if attr and attr in self.table_key_attrs and isinstance(value, dict):
raise SerializationException(
"Start of structure or map found where not expected"
)

View File

@ -505,9 +505,9 @@ class DynamoHandler(BaseResponse):
keys = request["Key"]
delete_requests.append((table_name, keys))
for (table_name, item) in put_requests:
for table_name, item in put_requests:
self.dynamodb_backend.put_item(table_name, item)
for (table_name, keys) in delete_requests:
for table_name, keys in delete_requests:
self.dynamodb_backend.delete_item(table_name, keys)
response = {
@ -920,7 +920,7 @@ class DynamoHandler(BaseResponse):
if type(changed) != type(original):
return changed
else:
if type(changed) is dict:
if isinstance(changed, dict):
return {
key: self._build_updated_new_attributes(
original.get(key, None), changed[key]
@ -994,7 +994,6 @@ class DynamoHandler(BaseResponse):
consumed_capacity: Dict[str, Any] = dict()
for transact_item in transact_items:
table_name = transact_item["Get"]["TableName"]
key = transact_item["Get"]["Key"]
item = self.dynamodb_backend.get_item(table_name, key)

View File

@ -612,7 +612,6 @@ class SecurityGroupBackend:
def get_security_group_by_name_or_id(
self, group_name_or_id: str, vpc_id: Optional[str] = None
) -> Optional[SecurityGroup]:
# try searching by id, fallbacks to name search
group = self.get_security_group_from_id(group_name_or_id)
if group is None:
@ -650,7 +649,7 @@ class SecurityGroupBackend:
if ip_ranges:
for cidr in ip_ranges:
if (
type(cidr) is dict
isinstance(cidr, dict)
and not any(
[
is_valid_cidr(cidr.get("CidrIp", "")),
@ -658,7 +657,7 @@ class SecurityGroupBackend:
]
)
) or (
type(cidr) is str
isinstance(cidr, str)
and not any([is_valid_cidr(cidr), is_valid_ipv6_cidr(cidr)])
):
raise InvalidCIDRSubnetError(cidr=cidr)
@ -729,7 +728,6 @@ class SecurityGroupBackend:
security_rule_ids: Optional[List[str]] = None,
vpc_id: Optional[str] = None,
) -> None:
group: SecurityGroup = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) # type: ignore[assignment]
if security_rule_ids:
@ -806,7 +804,6 @@ class SecurityGroupBackend:
if group is None:
raise InvalidSecurityGroupNotFoundError(group_name_or_id)
if ip_ranges and not isinstance(ip_ranges, list):
if isinstance(ip_ranges, str) and "CidrIp" not in ip_ranges:
ip_ranges = [{"CidrIp": ip_ranges}]
else:
@ -814,7 +811,7 @@ class SecurityGroupBackend:
if ip_ranges:
for cidr in ip_ranges:
if (
type(cidr) is dict
isinstance(cidr, dict)
and not any(
[
is_valid_cidr(cidr.get("CidrIp", "")),
@ -822,7 +819,7 @@ class SecurityGroupBackend:
]
)
) or (
type(cidr) is str
isinstance(cidr, str)
and not any([is_valid_cidr(cidr), is_valid_ipv6_cidr(cidr)])
):
raise InvalidCIDRSubnetError(cidr=cidr)
@ -896,7 +893,6 @@ class SecurityGroupBackend:
security_rule_ids: Optional[List[str]] = None,
vpc_id: Optional[str] = None,
) -> None:
group: SecurityGroup = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id) # type: ignore[assignment]
if security_rule_ids:
@ -982,12 +978,10 @@ class SecurityGroupBackend:
security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument
vpc_id: Optional[str] = None,
) -> SecurityGroup:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if group is None:
raise InvalidSecurityGroupNotFoundError(group_name_or_id)
if ip_ranges and not isinstance(ip_ranges, list):
if isinstance(ip_ranges, str) and "CidrIp" not in ip_ranges:
ip_ranges = [{"CidrIp": ip_ranges}]
else:
@ -995,7 +989,7 @@ class SecurityGroupBackend:
if ip_ranges:
for cidr in ip_ranges:
if (
type(cidr) is dict # type: ignore
isinstance(cidr, dict) # type: ignore
and not any(
[
is_valid_cidr(cidr.get("CidrIp", "")),
@ -1003,7 +997,7 @@ class SecurityGroupBackend:
]
)
) or (
type(cidr) is str
isinstance(cidr, str)
and not any([is_valid_cidr(cidr), is_valid_ipv6_cidr(cidr)])
):
raise InvalidCIDRSubnetError(cidr=cidr)
@ -1039,12 +1033,10 @@ class SecurityGroupBackend:
security_rule_ids: Optional[List[str]] = None, # pylint:disable=unused-argument
vpc_id: Optional[str] = None,
) -> SecurityGroup:
group = self.get_security_group_by_name_or_id(group_name_or_id, vpc_id)
if group is None:
raise InvalidSecurityGroupNotFoundError(group_name_or_id)
if ip_ranges and not isinstance(ip_ranges, list):
if isinstance(ip_ranges, str) and "CidrIp" not in ip_ranges:
ip_ranges = [{"CidrIp": ip_ranges}]
else:
@ -1052,7 +1044,7 @@ class SecurityGroupBackend:
if ip_ranges:
for cidr in ip_ranges:
if (
type(cidr) is dict # type: ignore
isinstance(cidr, dict) # type: ignore
and not any(
[
is_valid_cidr(cidr.get("CidrIp", "")),
@ -1060,7 +1052,7 @@ class SecurityGroupBackend:
]
)
) or (
type(cidr) is str
isinstance(cidr, str)
and not any([is_valid_cidr(cidr), is_valid_ipv6_cidr(cidr)])
):
raise InvalidCIDRSubnetError(cidr=cidr)

View File

@ -103,9 +103,9 @@ class Unflattener:
@staticmethod
def _add_to_container(container: Any, key: Any, value: Any) -> Any: # type: ignore[misc]
if type(container) is dict:
if isinstance(container, dict):
container[key] = value
elif type(container) is list:
elif isinstance(container, list):
i = int(key)
while len(container) < i:
container.append(None)
@ -114,17 +114,17 @@ class Unflattener:
@staticmethod
def _get_child(container: Any, key: Any) -> Any: # type: ignore[misc]
if type(container) is dict:
if isinstance(container, dict):
return container[key]
elif type(container) is list:
elif isinstance(container, list):
i = int(key)
return container[i - 1]
@staticmethod
def _key_in_container(container: Any, key: Any) -> bool: # type: ignore
if type(container) is dict:
if isinstance(container, dict):
return key in container
elif type(container) is list:
elif isinstance(container, list):
i = int(key)
return len(container) >= i
@ -161,7 +161,6 @@ class CamelToUnderscoresWalker:
class ReleaseLabel:
version_re = re.compile(r"^emr-(\d+)\.(\d+)\.(\d+)$")
def __init__(self, release_label: str):
@ -266,7 +265,6 @@ class EmrManagedServiceAccessSecurityGroup(EmrManagedSecurityGroup):
class EmrSecurityGroupManager:
MANAGED_RULES_EGRESS = [
{
"group_name_or_id": EmrManagedSecurityGroup.Kind.MASTER,

View File

@ -100,7 +100,7 @@ def metadata_from_headers(headers: Dict[str, Any]) -> CaseInsensitiveDict: # ty
if meta_key:
metadata[meta_key] = (
headers[header][0]
if type(headers[header]) == list
if isinstance(headers[header], list)
else headers[header]
)
return metadata

View File

@ -1 +1 @@
ignore = ["E501", "E721"]
ignore = ["E501"]

View File

@ -209,7 +209,7 @@ def test_volume_filters():
def verify_filter(name, value, expected=None, not_expected=None):
multiple_results = not_expected is not None
expected = expected or block_volume
expected = expected if type(expected) == list else [expected]
expected = expected if isinstance(expected, list) else [expected]
volumes = client.describe_volumes(Filters=[{"Name": name, "Values": [value]}])[
"Volumes"
]
@ -482,7 +482,7 @@ def test_snapshot_filters():
)
def verify_filter(name, value, expected, others=False):
expected = expected if type(expected) == list else [expected]
expected = expected if isinstance(expected, list) else [expected]
snapshots = client.describe_snapshots(
Filters=[{"Name": name, "Values": [value]}]
)["Snapshots"]

View File

@ -248,7 +248,7 @@ def test_default_network_acl_default_entries():
assert entry["Protocol"] == "-1"
assert entry["RuleNumber"] in [100, 32767]
assert entry["RuleAction"] in ["allow", "deny"]
assert type(entry["Egress"]) is bool
assert isinstance(entry["Egress"], bool)
if entry["RuleAction"] == "allow":
assert entry["RuleNumber"] == 100
else:

View File

@ -450,7 +450,7 @@ def test_put_image_with_push_date():
describe_response = client.describe_images(repositoryName="test_repository")
assert type(describe_response["imageDetails"]) == list
assert isinstance(describe_response["imageDetails"], list)
assert len(describe_response["imageDetails"]) == 2
assert {
@ -487,7 +487,7 @@ def test_put_image_with_multiple_tags():
assert response1["image"]["registryId"] == ACCOUNT_ID
response2 = client.describe_images(repositoryName="test_repository")
assert type(response2["imageDetails"]) == list
assert isinstance(response2["imageDetails"], list)
assert len(response2["imageDetails"]) == 1
assert "sha" in response2["imageDetails"][0]["imageDigest"]
@ -659,7 +659,7 @@ def test_list_images():
)
response = client.list_images(repositoryName="test_repository_1")
assert type(response["imageIds"]) == list
assert isinstance(response["imageIds"], list)
assert len(response["imageIds"]) == 3
for image in response["imageIds"]:
@ -673,7 +673,7 @@ def test_list_images():
} == set(image_tags)
response = client.list_images(repositoryName="test_repository_2")
assert type(response["imageIds"]) == list
assert isinstance(response["imageIds"], list)
assert len(response["imageIds"]) == 1
assert response["imageIds"][0]["imageTag"] == "oldest"
assert "sha" in response["imageIds"][0]["imageDigest"]
@ -739,7 +739,7 @@ def test_describe_images():
)
response = client.describe_images(repositoryName="test_repository")
assert type(response["imageDetails"]) == list
assert isinstance(response["imageDetails"], list)
assert len(response["imageDetails"]) == 7
for detail in response["imageDetails"][0:5]:
@ -1037,7 +1037,7 @@ def test_batch_get_image():
repositoryName="test_repository", imageIds=[{"imageTag": "v2"}]
)
assert type(response["images"]) == list
assert isinstance(response["images"], list)
assert len(response["images"]) == 1
assert (
@ -1050,7 +1050,7 @@ def test_batch_get_image():
assert response["images"][0]["imageId"]["imageTag"] == "v2"
assert "sha" in response["images"][0]["imageId"]["imageDigest"]
assert type(response["failures"]) == list
assert isinstance(response["failures"], list)
assert len(response["failures"]) == 0
@ -1081,10 +1081,10 @@ def test_batch_get_image_that_doesnt_exist():
repositoryName="test_repository", imageIds=[{"imageTag": "v5"}]
)
assert type(response["images"]) == list
assert isinstance(response["images"], list)
assert len(response["images"]) == 0
assert type(response["failures"]) == list
assert isinstance(response["failures"], list)
assert len(response["failures"]) == 1
assert response["failures"][0]["failureReason"] == "Requested image not found"
assert response["failures"][0]["failureCode"] == "ImageNotFound"
@ -1148,18 +1148,18 @@ def test_batch_delete_image_by_tag():
describe_response2 = client.describe_images(repositoryName="test_repository")
assert type(describe_response1["imageDetails"][0]["imageTags"]) == list
assert isinstance(describe_response1["imageDetails"][0]["imageTags"], list)
assert len(describe_response1["imageDetails"][0]["imageTags"]) == 3
assert type(describe_response2["imageDetails"][0]["imageTags"]) == list
assert isinstance(describe_response2["imageDetails"][0]["imageTags"], list)
assert len(describe_response2["imageDetails"][0]["imageTags"]) == 2
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 1
assert batch_delete_response["imageIds"][0]["imageTag"] == "latest"
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 0
@ -1184,18 +1184,18 @@ def test_batch_delete_image_delete_last_tag():
describe_response2 = client.describe_images(repositoryName="test_repository")
assert type(describe_response1["imageDetails"][0]["imageTags"]) == list
assert isinstance(describe_response1["imageDetails"][0]["imageTags"], list)
assert len(describe_response1["imageDetails"][0]["imageTags"]) == 1
assert type(describe_response2["imageDetails"]) == list
assert isinstance(describe_response2["imageDetails"], list)
assert len(describe_response2["imageDetails"]) == 0
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 1
assert batch_delete_response["imageIds"][0]["imageTag"] == "v1"
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 0
@ -1223,10 +1223,10 @@ def test_batch_delete_image_with_nonexistent_tag():
imageIds=[{"imageTag": missing_tag}],
)
assert type(describe_response["imageDetails"][0]["imageTags"]) == list
assert isinstance(describe_response["imageDetails"][0]["imageTags"], list)
assert len(describe_response["imageDetails"][0]["imageTags"]) == 3
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 0
assert batch_delete_response["failures"][0]["imageId"]["imageTag"] == missing_tag
@ -1236,7 +1236,7 @@ def test_batch_delete_image_with_nonexistent_tag():
== "Requested image not found"
)
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 1
@ -1266,10 +1266,10 @@ def test_batch_delete_image_by_digest():
describe_response = client.describe_images(repositoryName="test_repository")
assert type(describe_response["imageDetails"]) == list
assert isinstance(describe_response["imageDetails"], list)
assert len(describe_response["imageDetails"]) == 0
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 3
assert batch_delete_response["imageIds"][0]["imageDigest"] == image_digest
@ -1282,7 +1282,7 @@ def test_batch_delete_image_by_digest():
batch_delete_response["imageIds"][2]["imageTag"],
} == set(tags)
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 0
@ -1309,10 +1309,10 @@ def test_batch_delete_image_with_invalid_digest():
imageIds=[{"imageDigest": invalid_image_digest}],
)
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 0
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 1
assert (
@ -1335,10 +1335,10 @@ def test_batch_delete_image_with_missing_parameters():
registryId="012345678910", repositoryName="test_repository", imageIds=[{}]
)
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 0
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 1
assert batch_delete_response["failures"][0]["failureCode"] == "MissingDigestAndTag"
@ -1374,10 +1374,10 @@ def test_batch_delete_image_with_matching_digest_and_tag():
describe_response = client.describe_images(repositoryName="test_repository")
assert type(describe_response["imageDetails"]) == list
assert isinstance(describe_response["imageDetails"], list)
assert len(describe_response["imageDetails"]) == 0
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 3
assert batch_delete_response["imageIds"][0]["imageDigest"] == image_digest
@ -1390,7 +1390,7 @@ def test_batch_delete_image_with_matching_digest_and_tag():
batch_delete_response["imageIds"][2]["imageTag"],
} == set(tags)
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 0
@ -1418,10 +1418,10 @@ def test_batch_delete_image_with_mismatched_digest_and_tag():
imageIds=[{"imageDigest": image_digest, "imageTag": "v2"}],
)
assert type(batch_delete_response["imageIds"]) == list
assert isinstance(batch_delete_response["imageIds"], list)
assert len(batch_delete_response["imageIds"]) == 0
assert type(batch_delete_response["failures"]) == list
assert isinstance(batch_delete_response["failures"], list)
assert len(batch_delete_response["failures"]) == 1
assert (