Techdebt: MyPy IAM (#6066)

This commit is contained in:
Bert Blommers 2023-03-14 18:52:07 -01:00 committed by GitHub
parent f1fcbfaa00
commit fe8d30ae63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1047 additions and 803 deletions

View File

@ -108,7 +108,7 @@ class ConfigQueryModel:
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
aggregator: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Dict[str, Any]], str]:
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""For AWS Config. This will list all of the resources of the given type and optional resource name and region.
This supports both aggregated and non-aggregated listing. The following notes the difference:
@ -165,7 +165,7 @@ class ConfigQueryModel:
resource_name: Optional[str] = None,
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
) -> Dict[str, Any]:
) -> Optional[Dict[str, Any]]:
"""For AWS Config. This will query the backend for the specific resource type configuration.
This supports both aggregated, and non-aggregated fetching -- for batched fetching -- the Config batching requests
@ -194,7 +194,7 @@ class ConfigQueryModel:
raise NotImplementedError()
class CloudWatchMetricProvider(object):
class CloudWatchMetricProvider:
@staticmethod
@abstractmethod
def get_cloudwatch_metrics(account_id: str) -> Any: # type: ignore[misc]

View File

@ -154,7 +154,9 @@ def iso_8601_datetime_with_nanoseconds(value: datetime.datetime) -> str:
return value.strftime("%Y-%m-%dT%H:%M:%S.%f000Z")
def iso_8601_datetime_without_milliseconds(value: datetime.datetime) -> Optional[str]:
def iso_8601_datetime_without_milliseconds(
value: Optional[datetime.datetime],
) -> Optional[str]:
return value.strftime("%Y-%m-%dT%H:%M:%SZ") if value else None

View File

@ -17,6 +17,7 @@ import logging
import re
from abc import abstractmethod, ABCMeta
from enum import Enum
from typing import Any, Dict, Optional, Match, List, Union
from botocore.auth import SigV4Auth, S3SigV4Auth
from botocore.awsrequest import AWSRequest
@ -39,12 +40,14 @@ from moto.s3.exceptions import (
S3SignatureDoesNotMatchError,
)
from moto.sts.models import sts_backends
from .models import iam_backends, Policy
from .models import iam_backends, Policy, IAMBackend
log = logging.getLogger(__name__)
def create_access_key(account_id, access_key_id, headers):
def create_access_key(
account_id: str, access_key_id: str, headers: Dict[str, str]
) -> Union["IAMUserAccessKey", "AssumedRoleAccessKey"]:
if access_key_id.startswith("AKIA") or "X-Amz-Security-Token" not in headers:
return IAMUserAccessKey(account_id, access_key_id, headers)
else:
@ -53,10 +56,10 @@ def create_access_key(account_id, access_key_id, headers):
class IAMUserAccessKey:
@property
def backend(self):
def backend(self) -> IAMBackend:
return iam_backends[self.account_id]["global"]
def __init__(self, account_id, access_key_id, headers):
def __init__(self, account_id: str, access_key_id: str, headers: Dict[str, str]):
self.account_id = account_id
iam_users = self.backend.list_users("/", None, None)
@ -72,13 +75,13 @@ class IAMUserAccessKey:
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self):
def arn(self) -> str:
return f"arn:aws:iam::{self.account_id}:user/{self._owner_user_name}"
def create_credentials(self):
def create_credentials(self) -> Credentials:
return Credentials(self._access_key_id, self._secret_access_key)
def collect_policies(self):
def collect_policies(self) -> List[Dict[str, str]]:
user_policies = []
inline_policy_names = self.backend.list_user_policies(self._owner_user_name)
@ -112,12 +115,12 @@ class IAMUserAccessKey:
return user_policies
class AssumedRoleAccessKey(object):
class AssumedRoleAccessKey:
@property
def backend(self):
def backend(self) -> IAMBackend: # type: ignore[misc]
return iam_backends[self.account_id]["global"]
def __init__(self, account_id, access_key_id, headers):
def __init__(self, account_id: str, access_key_id: str, headers: Dict[str, str]):
self.account_id = account_id
for assumed_role in sts_backends[account_id]["global"].assumed_roles:
if assumed_role.access_key_id == access_key_id:
@ -132,15 +135,15 @@ class AssumedRoleAccessKey(object):
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self):
def arn(self) -> str:
return f"arn:aws:sts::{self.account_id}:assumed-role/{self._owner_role_name}/{self._session_name}"
def create_credentials(self):
def create_credentials(self) -> Credentials:
return Credentials(
self._access_key_id, self._secret_access_key, self._session_token
)
def collect_policies(self):
def collect_policies(self) -> List[str]:
role_policies = []
inline_policy_names = self.backend.list_role_policies(self._owner_role_name)
@ -153,19 +156,26 @@ class AssumedRoleAccessKey(object):
attached_policies, _ = self.backend.list_attached_role_policies(
self._owner_role_name
)
role_policies += attached_policies
role_policies += attached_policies # type: ignore[arg-type]
return role_policies
class CreateAccessKeyFailure(Exception):
def __init__(self, reason, *args):
super().__init__(*args)
def __init__(self, reason: str):
super().__init__()
self.reason = reason
class IAMRequestBase(object, metaclass=ABCMeta):
def __init__(self, account_id, method, path, data, headers):
def __init__(
self,
account_id: str,
method: str,
path: str,
data: Dict[str, str],
headers: Dict[str, str],
):
log.debug(
f"Creating {self.__class__.__name__} with method={method}, path={path}, data={data}, headers={headers}"
)
@ -198,7 +208,7 @@ class IAMRequestBase(object, metaclass=ABCMeta):
except CreateAccessKeyFailure as e:
self._raise_invalid_access_key(e.reason)
def check_signature(self):
def check_signature(self) -> None:
original_signature = self._get_string_between(
"Signature=", ",", self._headers["Authorization"]
)
@ -206,11 +216,11 @@ class IAMRequestBase(object, metaclass=ABCMeta):
if original_signature != calculated_signature:
self._raise_signature_does_not_match()
def check_action_permitted(self):
def check_action_permitted(self) -> None:
if (
self._action == "sts:GetCallerIdentity"
): # always allowed, even if there's an explicit Deny for it
return True
return
policies = self._access_key.collect_policies()
permitted = False
@ -226,30 +236,32 @@ class IAMRequestBase(object, metaclass=ABCMeta):
self._raise_access_denied()
@abstractmethod
def _raise_signature_does_not_match(self):
def _raise_signature_does_not_match(self) -> None:
raise NotImplementedError()
@abstractmethod
def _raise_access_denied(self):
def _raise_access_denied(self) -> None:
raise NotImplementedError()
@abstractmethod
def _raise_invalid_access_key(self, reason):
def _raise_invalid_access_key(self, reason: str) -> None:
raise NotImplementedError()
@abstractmethod
def _create_auth(self, credentials):
def _create_auth(self, credentials: Credentials) -> SigV4Auth: # type: ignore[misc]
raise NotImplementedError()
@staticmethod
def _create_headers_for_aws_request(signed_headers, original_headers):
def _create_headers_for_aws_request(
signed_headers: List[str], original_headers: Dict[str, str]
) -> Dict[str, str]:
headers = {}
for key, value in original_headers.items():
if key.lower() in signed_headers:
headers[key] = value
return headers
def _create_aws_request(self):
def _create_aws_request(self) -> AWSRequest:
signed_headers = self._get_string_between(
"SignedHeaders=", ",", self._headers["Authorization"]
).split(";")
@ -261,7 +273,7 @@ class IAMRequestBase(object, metaclass=ABCMeta):
return request
def _calculate_signature(self):
def _calculate_signature(self) -> str:
credentials = self._access_key.create_credentials()
auth = self._create_auth(credentials)
request = self._create_aws_request()
@ -270,38 +282,40 @@ class IAMRequestBase(object, metaclass=ABCMeta):
return auth.signature(string_to_sign, request)
@staticmethod
def _get_string_between(first_separator, second_separator, string):
def _get_string_between(
first_separator: str, second_separator: str, string: str
) -> str:
return string.partition(first_separator)[2].partition(second_separator)[0]
class IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self):
def _raise_signature_does_not_match(self) -> None:
if self._service == "ec2":
raise AuthFailureError()
else:
raise SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, _):
def _raise_invalid_access_key(self, _: str) -> None:
if self._service == "ec2":
raise AuthFailureError()
else:
raise InvalidClientTokenIdError()
def _create_auth(self, credentials):
def _create_auth(self, credentials: Any) -> SigV4Auth:
return SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self):
def _raise_access_denied(self) -> None:
raise AccessDeniedError(user_arn=self._access_key.arn, action=self._action)
class S3IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self):
def _raise_signature_does_not_match(self) -> None:
if "BucketName" in self._data:
raise BucketSignatureDoesNotMatchError(bucket=self._data["BucketName"])
else:
raise S3SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, reason):
def _raise_invalid_access_key(self, reason: str) -> None:
if reason == "InvalidToken":
if "BucketName" in self._data:
raise BucketInvalidTokenError(bucket=self._data["BucketName"])
@ -313,18 +327,18 @@ class S3IAMRequest(IAMRequestBase):
else:
raise S3InvalidAccessKeyIdError()
def _create_auth(self, credentials):
def _create_auth(self, credentials: Any) -> S3SigV4Auth:
return S3SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self):
def _raise_access_denied(self) -> None:
if "BucketName" in self._data:
raise BucketAccessDeniedError(bucket=self._data["BucketName"])
else:
raise S3AccessDeniedError()
class IAMPolicy(object):
def __init__(self, policy):
class IAMPolicy:
def __init__(self, policy: Any):
if isinstance(policy, Policy):
default_version = next(
policy_version
@ -337,9 +351,11 @@ class IAMPolicy(object):
else:
policy_document = policy["policy_document"]
self._policy_json = json.loads(policy_document)
self._policy_json = json.loads(policy_document) # type: ignore[arg-type]
def is_action_permitted(self, action, resource="*"):
def is_action_permitted(
self, action: str, resource: str = "*"
) -> "PermissionResult":
permitted = False
if isinstance(self._policy_json["Statement"], list):
for policy_statement in self._policy_json["Statement"]:
@ -361,11 +377,13 @@ class IAMPolicy(object):
return PermissionResult.NEUTRAL
class IAMPolicyStatement(object):
def __init__(self, statement):
class IAMPolicyStatement:
def __init__(self, statement: Any):
self._statement = statement
def is_action_permitted(self, action, resource="*"):
def is_action_permitted(
self, action: str, resource: str = "*"
) -> "PermissionResult":
is_action_concerned = False
if "NotAction" in self._statement:
@ -386,7 +404,7 @@ class IAMPolicyStatement(object):
else:
return PermissionResult.NEUTRAL
def is_unknown_principal(self, principal) -> bool:
def is_unknown_principal(self, principal: Optional[str]) -> bool:
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-bucket-user-policy-specifying-principal-intro.html
# For now, Moto only verifies principal == *
# 'Unknown' principals are not verified
@ -401,17 +419,17 @@ class IAMPolicyStatement(object):
return True
return False
def _check_element_matches(self, statement_element, value):
def _check_element_matches(self, statement_element: Any, value: str) -> bool:
if isinstance(self._statement[statement_element], list):
for statement_element_value in self._statement[statement_element]:
if self._match(statement_element_value, value):
return True
return False
else: # string
return self._match(self._statement[statement_element], value)
return self._match(self._statement[statement_element], value) is not None
@staticmethod
def _match(pattern, string):
def _match(pattern: str, string: str) -> Optional[Match[str]]:
pattern = pattern.replace("*", ".*")
pattern = f"^{pattern}$"
return re.match(pattern, string)

View File

@ -1,5 +1,6 @@
import json
import boto3
from typing import Any, Dict, List, Optional, Tuple
from moto.core.exceptions import InvalidNextTokenException
from moto.core.common_models import ConfigQueryModel
from moto.iam import iam_backends
@ -8,15 +9,15 @@ from moto.iam import iam_backends
class RoleConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
account_id,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
account_id: str,
resource_ids: Optional[List[str]],
resource_name: Optional[str],
limit: int,
next_token: Optional[str],
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
aggregator: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
# IAM roles are "global" and aren't assigned into any availability zone
# The resource ID is a AWS-assigned random string like "AROA0BSVNSZKXVHS00SBJ"
# The resource name is a user-assigned string like "MyDevelopmentAdminRole"
@ -43,7 +44,7 @@ class RoleConfigQuery(ConfigQueryModel):
return [], None
else:
for role in role_list:
if role.id in resource_ids:
if role.id in resource_ids: # type: ignore[operator]
filtered_roles.append(role)
# Filtered roles are now the subject for the listing
@ -60,7 +61,7 @@ class RoleConfigQuery(ConfigQueryModel):
aggregator_sources = aggregator.get(
"account_aggregation_sources"
) or aggregator.get("organization_aggregation_source")
for source in aggregator_sources:
for source in aggregator_sources: # type: ignore[union-attr]
source_dict = source.__dict__
if source_dict.get("all_aws_regions", False):
aggregated_regions = boto3.Session().get_available_regions("config")
@ -86,7 +87,7 @@ class RoleConfigQuery(ConfigQueryModel):
else:
# Non-aggregated queries are in the else block, and we can treat these like a normal config resource
# Pagination logic, sort by role id
sorted_roles = sorted(role_list, key=lambda role: role.id)
sorted_roles = sorted(role_list, key=lambda role: role.id) # type: ignore[attr-defined]
new_token = None
@ -99,7 +100,7 @@ class RoleConfigQuery(ConfigQueryModel):
start = next(
index
for (index, r) in enumerate(sorted_roles)
if next_token == (r["_id"] if aggregator else r.id)
if next_token == (r["_id"] if aggregator else r.id) # type: ignore[attr-defined]
)
except StopIteration:
raise InvalidNextTokenException()
@ -109,14 +110,14 @@ class RoleConfigQuery(ConfigQueryModel):
if len(sorted_roles) > (start + limit):
record = sorted_roles[start + limit]
new_token = record["_id"] if aggregator else record.id
new_token = record["_id"] if aggregator else record.id # type: ignore[attr-defined]
return (
[
{
"type": "AWS::IAM::Role",
"id": role["id"] if aggregator else role.id,
"name": role["name"] if aggregator else role.name,
"id": role["id"] if aggregator else role.id, # type: ignore[attr-defined]
"name": role["name"] if aggregator else role.name, # type: ignore[attr-defined]
"region": role["region"] if aggregator else "global",
}
for role in role_list
@ -126,20 +127,20 @@ class RoleConfigQuery(ConfigQueryModel):
def get_config_resource(
self,
account_id,
resource_id,
resource_name=None,
backend_region=None,
resource_region=None,
):
account_id: str,
resource_id: str,
resource_name: Optional[str] = None,
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
role = self.backends[account_id]["global"].roles.get(resource_id, {})
if not role:
return
return None
if resource_name and role.name != resource_name:
return
return None
# Format the role to the AWS Config format:
config_data = role.to_config_dict()
@ -158,15 +159,15 @@ class RoleConfigQuery(ConfigQueryModel):
class PolicyConfigQuery(ConfigQueryModel):
def list_config_service_resources(
self,
account_id,
resource_ids,
resource_name,
limit,
next_token,
backend_region=None,
resource_region=None,
aggregator=None,
):
account_id: str,
resource_ids: Optional[List[str]],
resource_name: Optional[str],
limit: int,
next_token: Optional[str],
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
aggregator: Optional[Dict[str, Any]] = None,
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
# IAM policies are "global" and aren't assigned into any availability zone
# The resource ID is a AWS-assigned random string like "ANPA0BSVNSZK00SJSPVUJ"
# The resource name is a user-assigned string like "my-development-policy"
@ -206,7 +207,7 @@ class PolicyConfigQuery(ConfigQueryModel):
else:
for policy in policy_list:
if policy.id in resource_ids:
if policy.id in resource_ids: # type: ignore[operator]
filtered_policies.append(policy)
# Filtered roles are now the subject for the listing
@ -223,7 +224,7 @@ class PolicyConfigQuery(ConfigQueryModel):
aggregator_sources = aggregator.get(
"account_aggregation_sources"
) or aggregator.get("organization_aggregation_source")
for source in aggregator_sources:
for source in aggregator_sources: # type: ignore[union-attr]
source_dict = source.__dict__
if source_dict.get("all_aws_regions", False):
aggregated_regions = boto3.Session().get_available_regions("config")
@ -252,7 +253,7 @@ class PolicyConfigQuery(ConfigQueryModel):
else:
# Non-aggregated queries are in the else block, and we can treat these like a normal config resource
# Pagination logic, sort by role id
sorted_policies = sorted(policy_list, key=lambda role: role.id)
sorted_policies = sorted(policy_list, key=lambda role: role.id) # type: ignore[attr-defined]
new_token = None
@ -265,7 +266,7 @@ class PolicyConfigQuery(ConfigQueryModel):
start = next(
index
for (index, p) in enumerate(sorted_policies)
if next_token == (p["_id"] if aggregator else p.id)
if next_token == (p["_id"] if aggregator else p.id) # type: ignore[attr-defined]
)
except StopIteration:
raise InvalidNextTokenException()
@ -275,14 +276,14 @@ class PolicyConfigQuery(ConfigQueryModel):
if len(sorted_policies) > (start + limit):
record = sorted_policies[start + limit]
new_token = record["_id"] if aggregator else record.id
new_token = record["_id"] if aggregator else record.id # type: ignore[attr-defined]
return (
[
{
"type": "AWS::IAM::Policy",
"id": policy["id"] if aggregator else policy.id,
"name": policy["name"] if aggregator else policy.name,
"id": policy["id"] if aggregator else policy.id, # type: ignore[attr-defined]
"name": policy["name"] if aggregator else policy.name, # type: ignore[attr-defined]
"region": policy["region"] if aggregator else "global",
}
for policy in policy_list
@ -292,12 +293,12 @@ class PolicyConfigQuery(ConfigQueryModel):
def get_config_resource(
self,
account_id,
resource_id,
resource_name=None,
backend_region=None,
resource_region=None,
):
account_id: str,
resource_id: str,
resource_name: Optional[str] = None,
backend_region: Optional[str] = None,
resource_region: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
# policies are listed in the backend as arns, but we have to accept the PolicyID as the resource_id
# we'll make a really crude search for it
policy = None
@ -308,10 +309,10 @@ class PolicyConfigQuery(ConfigQueryModel):
break
if not policy:
return
return None
if resource_name and policy.name != resource_name:
return
return None
# Format the policy to the AWS Config format:
config_data = policy.to_config_dict()

View File

@ -1,3 +1,4 @@
from typing import Any
from moto.core.exceptions import RESTError
XMLNS_IAM = "https://iam.amazonaws.com/doc/2010-05-08/"
@ -15,28 +16,28 @@ class IAMNotFoundException(RESTError):
class IAMConflictException(RESTError):
code = 409
def __init__(self, code="Conflict", message=""):
def __init__(self, code: str = "Conflict", message: str = ""):
super().__init__(code, message)
class IAMReportNotPresentException(RESTError):
code = 410
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ReportNotPresent", message)
class IAMLimitExceededException(RESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("LimitExceeded", message)
class MalformedCertificate(RESTError):
code = 400
def __init__(self, cert):
def __init__(self, cert: str):
super().__init__("MalformedCertificate", f"Certificate {cert} is malformed")
@ -55,7 +56,7 @@ class MalformedPolicyDocument(RESTError):
class DuplicateTags(RESTError):
code = 400
def __init__(self):
def __init__(self) -> None:
super().__init__(
"InvalidInput",
"Duplicate tag keys found. Please note that Tag keys are case insensitive.",
@ -65,7 +66,7 @@ class DuplicateTags(RESTError):
class TagKeyTooBig(RESTError):
code = 400
def __init__(self, tag, param="tags.X.member.key"):
def __init__(self, tag: str, param: str = "tags.X.member.key"):
super().__init__(
"ValidationError",
f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy "
@ -76,7 +77,7 @@ class TagKeyTooBig(RESTError):
class TagValueTooBig(RESTError):
code = 400
def __init__(self, tag):
def __init__(self, tag: str):
super().__init__(
"ValidationError",
f"1 validation error detected: Value '{tag}' at 'tags.X.member.value' failed to satisfy "
@ -87,7 +88,7 @@ class TagValueTooBig(RESTError):
class InvalidTagCharacters(RESTError):
code = 400
def __init__(self, tag, param="tags.X.member.key"):
def __init__(self, tag: str, param: str = "tags.X.member.key"):
message = f"1 validation error detected: Value '{tag}' at '{param}' failed to satisfy constraint: Member must satisfy regular expression pattern: [\\p{{L}}\\p{{Z}}\\p{{N}}_.:/=+\\-@]+"
super().__init__("ValidationError", message)
@ -96,7 +97,7 @@ class InvalidTagCharacters(RESTError):
class TooManyTags(RESTError):
code = 400
def __init__(self, tags, param="tags"):
def __init__(self, tags: Any, param: str = "tags"):
super().__init__(
"ValidationError",
f"1 validation error detected: Value '{tags}' at '{param}' failed to satisfy "
@ -107,28 +108,28 @@ class TooManyTags(RESTError):
class EntityAlreadyExists(RESTError):
code = 409
def __init__(self, message):
def __init__(self, message: str):
super().__init__("EntityAlreadyExists", message)
class ValidationError(RESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("ValidationError", message)
class InvalidInput(RESTError):
code = 400
def __init__(self, message):
def __init__(self, message: str):
super().__init__("InvalidInput", message)
class NoSuchEntity(RESTError):
code = 404
def __init__(self, message):
def __init__(self, message: str):
super().__init__(
"NoSuchEntity", message, xmlns=XMLNS_IAM, template="wrapped_single_error"
)

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
import json
import re
from typing import Any, Dict, List
from moto.iam.exceptions import MalformedPolicyDocument
@ -61,7 +61,7 @@ SERVICE_TYPE_REGION_INFORMATION_ERROR_ASSOCIATIONS = {
"s3": "Resource {resource} can not contain region information.",
}
VALID_RESOURCE_PATH_STARTING_VALUES = {
VALID_RESOURCE_PATH_STARTING_VALUES: Dict[str, Any] = {
"iam": {
"values": [
"user/",
@ -84,13 +84,13 @@ VALID_RESOURCE_PATH_STARTING_VALUES = {
class BaseIAMPolicyValidator:
def __init__(self, policy_document):
def __init__(self, policy_document: str):
self._policy_document = policy_document
self._policy_json = {}
self._statements = []
self._policy_json: Dict[str, Any] = {}
self._statements: List[Dict[str, Any]] = []
self._resource_error = "" # the first resource error found that does not generate a legacy parsing error
def validate(self):
def validate(self) -> None:
try:
self._validate_syntax()
except Exception:
@ -124,7 +124,7 @@ class BaseIAMPolicyValidator:
self._validate_actions_for_prefixes()
self._validate_not_actions_for_prefixes()
def _validate_syntax(self):
def _validate_syntax(self) -> None:
self._policy_json = json.loads(self._policy_document)
assert isinstance(self._policy_json, dict)
self._validate_top_elements()
@ -132,19 +132,19 @@ class BaseIAMPolicyValidator:
self._validate_id_syntax()
self._validate_statements_syntax()
def _validate_top_elements(self):
def _validate_top_elements(self) -> None:
top_elements = self._policy_json.keys()
for element in top_elements:
assert element in VALID_TOP_ELEMENTS
def _validate_version_syntax(self):
def _validate_version_syntax(self) -> None:
if "Version" in self._policy_json:
assert self._policy_json["Version"] in VALID_VERSIONS
def _validate_version(self):
def _validate_version(self) -> None:
assert self._policy_json["Version"] == "2012-10-17"
def _validate_sid_uniqueness(self):
def _validate_sid_uniqueness(self) -> None:
sids = []
for statement in self._statements:
if "Sid" in statement:
@ -153,7 +153,7 @@ class BaseIAMPolicyValidator:
assert statementId not in sids
sids.append(statementId)
def _validate_statements_syntax(self):
def _validate_statements_syntax(self) -> None:
assert "Statement" in self._policy_json
assert isinstance(self._policy_json["Statement"], (dict, list))
@ -167,7 +167,7 @@ class BaseIAMPolicyValidator:
self._validate_statement_syntax(statement)
@staticmethod
def _validate_statement_syntax(statement):
def _validate_statement_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
assert isinstance(statement, dict)
for statement_element in statement.keys():
assert statement_element in VALID_STATEMENT_ELEMENTS
@ -184,7 +184,7 @@ class BaseIAMPolicyValidator:
IAMPolicyDocumentValidator._validate_sid_syntax(statement)
@staticmethod
def _validate_effect_syntax(statement):
def _validate_effect_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
assert "Effect" in statement
assert isinstance(statement["Effect"], str)
assert statement["Effect"].lower() in [
@ -192,31 +192,31 @@ class BaseIAMPolicyValidator:
]
@staticmethod
def _validate_action_syntax(statement):
def _validate_action_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
statement, "Action"
)
@staticmethod
def _validate_not_action_syntax(statement):
def _validate_not_action_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
statement, "NotAction"
)
@staticmethod
def _validate_resource_syntax(statement):
def _validate_resource_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
statement, "Resource"
)
@staticmethod
def _validate_not_resource_syntax(statement):
def _validate_not_resource_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
IAMPolicyDocumentValidator._validate_string_or_list_of_strings_syntax(
statement, "NotResource"
)
@staticmethod
def _validate_string_or_list_of_strings_syntax(statement, key):
def _validate_string_or_list_of_strings_syntax(statement: Dict[str, Any], key: str) -> None: # type: ignore[misc]
if key in statement:
assert isinstance(statement[key], (str, list))
if isinstance(statement[key], list):
@ -224,7 +224,7 @@ class BaseIAMPolicyValidator:
assert isinstance(resource, str)
@staticmethod
def _validate_condition_syntax(statement):
def _validate_condition_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
if "Condition" in statement:
assert isinstance(statement["Condition"], dict)
for condition_key, condition_value in statement["Condition"].items():
@ -239,7 +239,7 @@ class BaseIAMPolicyValidator:
assert not condition_value # empty dict
@staticmethod
def _strip_condition_key(condition_key):
def _strip_condition_key(condition_key: str) -> str:
for valid_prefix in VALID_CONDITION_PREFIXES:
if condition_key.startswith(valid_prefix):
condition_key = condition_key[len(valid_prefix) :]
@ -253,15 +253,15 @@ class BaseIAMPolicyValidator:
return condition_key
@staticmethod
def _validate_sid_syntax(statement):
def _validate_sid_syntax(statement: Dict[str, Any]) -> None: # type: ignore[misc]
if "Sid" in statement:
assert isinstance(statement["Sid"], str)
def _validate_id_syntax(self):
def _validate_id_syntax(self) -> None:
if "Id" in self._policy_json:
assert isinstance(self._policy_json["Id"], str)
def _validate_resource_exist(self):
def _validate_resource_exist(self) -> None:
for statement in self._statements:
assert "Resource" in statement or "NotResource" in statement
if "Resource" in statement and isinstance(statement["Resource"], list):
@ -271,7 +271,7 @@ class BaseIAMPolicyValidator:
):
assert statement["NotResource"]
def _validate_action_like_exist(self):
def _validate_action_like_exist(self) -> None:
for statement in self._statements:
assert "Action" in statement or "NotAction" in statement
if "Action" in statement and isinstance(statement["Action"], list):
@ -279,13 +279,13 @@ class BaseIAMPolicyValidator:
elif "NotAction" in statement and isinstance(statement["NotAction"], list):
assert statement["NotAction"]
def _validate_actions_for_prefixes(self):
def _validate_actions_for_prefixes(self) -> None:
self._validate_action_like_for_prefixes("Action")
def _validate_not_actions_for_prefixes(self):
def _validate_not_actions_for_prefixes(self) -> None:
self._validate_action_like_for_prefixes("NotAction")
def _validate_action_like_for_prefixes(self, key):
def _validate_action_like_for_prefixes(self, key: str) -> None:
for statement in self._statements:
if key in statement:
if isinstance(statement[key], str):
@ -295,7 +295,7 @@ class BaseIAMPolicyValidator:
self._validate_action_prefix(action)
@staticmethod
def _validate_action_prefix(action):
def _validate_action_prefix(action: str) -> None:
action_parts = action.split(":")
if len(action_parts) == 1 and action_parts[0] != "*":
raise MalformedPolicyDocument(
@ -310,13 +310,13 @@ class BaseIAMPolicyValidator:
if action_parts[0] != "*" and vendor_pattern.search(action_parts[0]):
raise MalformedPolicyDocument(f"Vendor {action_parts[0]} is not valid")
def _validate_resources_for_formats(self):
def _validate_resources_for_formats(self) -> None:
self._validate_resource_like_for_formats("Resource")
def _validate_not_resources_for_formats(self):
def _validate_not_resources_for_formats(self) -> None:
self._validate_resource_like_for_formats("NotResource")
def _validate_resource_like_for_formats(self, key):
def _validate_resource_like_for_formats(self, key: str) -> None:
for statement in self._statements:
if key in statement:
if isinstance(statement[key], str):
@ -329,7 +329,7 @@ class BaseIAMPolicyValidator:
statement, key
)
def _validate_resource_format(self, resource):
def _validate_resource_format(self, resource: str) -> None:
if resource != "*":
resource_partitions = resource.partition(":")
@ -407,13 +407,13 @@ class BaseIAMPolicyValidator:
)
)
def _perform_first_legacy_parsing(self):
def _perform_first_legacy_parsing(self) -> None:
"""This method excludes legacy parsing resources, since that have to be done later."""
for statement in self._statements:
self._legacy_parse_statement(statement)
@staticmethod
def _legacy_parse_statement(statement):
def _legacy_parse_statement(statement: Dict[str, Any]) -> None: # type: ignore[misc]
assert statement["Effect"] in VALID_EFFECTS # case-sensitive matching
if "Condition" in statement:
for condition_key, condition_value in statement["Condition"].items():
@ -422,7 +422,7 @@ class BaseIAMPolicyValidator:
)
@staticmethod
def _legacy_parse_resource_like(statement, key):
def _legacy_parse_resource_like(statement: Dict[str, Any], key: str) -> None: # type: ignore[misc]
if isinstance(statement[key], str):
if statement[key] != "*":
assert statement[key].count(":") >= 5 or "::" not in statement[key]
@ -434,7 +434,7 @@ class BaseIAMPolicyValidator:
assert resource[2] != ""
@staticmethod
def _legacy_parse_condition(condition_key, condition_value):
def _legacy_parse_condition(condition_key: str, condition_value: Dict[str, Any]) -> None: # type: ignore[misc]
stripped_condition_key = IAMPolicyDocumentValidator._strip_condition_key(
condition_key
)
@ -452,7 +452,7 @@ class BaseIAMPolicyValidator:
)
@staticmethod
def _legacy_parse_date_condition_value(date_condition_value):
def _legacy_parse_date_condition_value(date_condition_value: str) -> None:
if "t" in date_condition_value.lower() or "-" in date_condition_value:
IAMPolicyDocumentValidator._validate_iso_8601_datetime(
date_condition_value.lower()
@ -461,7 +461,7 @@ class BaseIAMPolicyValidator:
assert 0 <= int(date_condition_value) <= 9223372036854775807
@staticmethod
def _validate_iso_8601_datetime(datetime):
def _validate_iso_8601_datetime(datetime: str) -> None:
datetime_parts = datetime.partition("t")
negative_year = datetime_parts[0].startswith("-")
date_parts = (
@ -525,10 +525,10 @@ class IAMPolicyDocumentValidator(BaseIAMPolicyValidator):
class IAMTrustPolicyDocumentValidator(BaseIAMPolicyValidator):
def __init__(self, policy_document):
def __init__(self, policy_document: str):
super().__init__(policy_document)
def validate(self):
def validate(self) -> None:
super().validate()
try:
for statement in self._statements:
@ -551,12 +551,12 @@ class IAMTrustPolicyDocumentValidator(BaseIAMPolicyValidator):
except Exception:
raise MalformedPolicyDocument("Has prohibited field Resource.")
def _validate_resource_not_exist(self):
def _validate_resource_not_exist(self) -> None:
for statement in self._statements:
assert "Resource" not in statement and "NotResource" not in statement
@staticmethod
def _validate_trust_policy_action(action):
def _validate_trust_policy_action(action: str) -> None:
# https://docs.aws.amazon.com/service-authorization/latest/reference/list_awssecuritytokenservice.html
assert action in (
"sts:AssumeRole",

View File

@ -1,59 +1,59 @@
from moto.core.responses import BaseResponse
from .models import iam_backends, User
from .models import iam_backends, IAMBackend, User
class IamResponse(BaseResponse):
def __init__(self):
def __init__(self) -> None:
super().__init__(service_name="iam")
@property
def backend(self):
def backend(self) -> IAMBackend:
return iam_backends[self.current_account]["global"]
def attach_role_policy(self):
def attach_role_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
role_name = self._get_param("RoleName")
self.backend.attach_role_policy(policy_arn, role_name)
template = self.response_template(ATTACH_ROLE_POLICY_TEMPLATE)
return template.render()
def detach_role_policy(self):
def detach_role_policy(self) -> str:
role_name = self._get_param("RoleName")
policy_arn = self._get_param("PolicyArn")
self.backend.detach_role_policy(policy_arn, role_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DetachRolePolicy")
def attach_group_policy(self):
def attach_group_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
group_name = self._get_param("GroupName")
self.backend.attach_group_policy(policy_arn, group_name)
template = self.response_template(ATTACH_GROUP_POLICY_TEMPLATE)
return template.render()
def detach_group_policy(self):
def detach_group_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
group_name = self._get_param("GroupName")
self.backend.detach_group_policy(policy_arn, group_name)
template = self.response_template(DETACH_GROUP_POLICY_TEMPLATE)
return template.render()
def attach_user_policy(self):
def attach_user_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
user_name = self._get_param("UserName")
self.backend.attach_user_policy(policy_arn, user_name)
template = self.response_template(ATTACH_USER_POLICY_TEMPLATE)
return template.render()
def detach_user_policy(self):
def detach_user_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
user_name = self._get_param("UserName")
self.backend.detach_user_policy(policy_arn, user_name)
template = self.response_template(DETACH_USER_POLICY_TEMPLATE)
return template.render()
def create_policy(self):
def create_policy(self) -> str:
description = self._get_param("Description")
path = self._get_param("Path")
policy_document = self._get_param("PolicyDocument")
@ -65,13 +65,13 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_POLICY_TEMPLATE)
return template.render(policy=policy)
def get_policy(self):
def get_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
policy = self.backend.get_policy(policy_arn)
template = self.response_template(GET_POLICY_TEMPLATE)
return template.render(policy=policy)
def list_attached_role_policies(self):
def list_attached_role_policies(self) -> str:
marker = self._get_param("Marker")
max_items = self._get_int_param("MaxItems", 100)
path_prefix = self._get_param("PathPrefix", "/")
@ -82,7 +82,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_ATTACHED_ROLE_POLICIES_TEMPLATE)
return template.render(policies=policies, marker=marker)
def list_attached_group_policies(self):
def list_attached_group_policies(self) -> str:
marker = self._get_param("Marker")
max_items = self._get_int_param("MaxItems", 100)
path_prefix = self._get_param("PathPrefix", "/")
@ -93,7 +93,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_ATTACHED_GROUP_POLICIES_TEMPLATE)
return template.render(policies=policies, marker=marker)
def list_attached_user_policies(self):
def list_attached_user_policies(self) -> str:
marker = self._get_param("Marker")
max_items = self._get_int_param("MaxItems", 100)
path_prefix = self._get_param("PathPrefix", "/")
@ -104,7 +104,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_ATTACHED_USER_POLICIES_TEMPLATE)
return template.render(policies=policies, marker=marker)
def list_policies(self):
def list_policies(self) -> str:
marker = self._get_param("Marker")
max_items = self._get_int_param("MaxItems", 100)
only_attached = self._get_bool_param("OnlyAttached", False)
@ -116,7 +116,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_POLICIES_TEMPLATE)
return template.render(policies=policies, marker=marker)
def list_entities_for_policy(self):
def list_entities_for_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
# Options 'User'|'Role'|'Group'|'LocalManagedPolicy'|'AWSManagedPolicy
@ -181,14 +181,14 @@ class IamResponse(BaseResponse):
roles=entity_roles, users=entity_users, groups=entity_groups
)
def set_default_policy_version(self):
def set_default_policy_version(self) -> str:
policy_arn = self._get_param("PolicyArn")
version_id = self._get_param("VersionId")
self.backend.set_default_policy_version(policy_arn, version_id)
template = self.response_template(SET_DEFAULT_POLICY_VERSION_TEMPLATE)
return template.render()
def create_role(self):
def create_role(self) -> str:
role_name = self._get_param("RoleName")
path = self._get_param("Path")
assume_role_policy_document = self._get_param("AssumeRolePolicyDocument")
@ -209,26 +209,26 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_ROLE_TEMPLATE)
return template.render(role=role)
def get_role(self):
def get_role(self) -> str:
role_name = self._get_param("RoleName")
role = self.backend.get_role(role_name)
template = self.response_template(GET_ROLE_TEMPLATE)
return template.render(role=role)
def delete_role(self):
def delete_role(self) -> str:
role_name = self._get_param("RoleName")
self.backend.delete_role(role_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteRole")
def list_role_policies(self):
def list_role_policies(self) -> str:
role_name = self._get_param("RoleName")
role_policies_names = self.backend.list_role_policies(role_name)
template = self.response_template(LIST_ROLE_POLICIES)
return template.render(role_policies=role_policies_names)
def put_role_policy(self):
def put_role_policy(self) -> str:
role_name = self._get_param("RoleName")
policy_name = self._get_param("PolicyName")
policy_document = self._get_param("PolicyDocument")
@ -236,14 +236,14 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="PutRolePolicy")
def delete_role_policy(self):
def delete_role_policy(self) -> str:
role_name = self._get_param("RoleName")
policy_name = self._get_param("PolicyName")
self.backend.delete_role_policy(role_name, policy_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteRolePolicy")
def get_role_policy(self):
def get_role_policy(self) -> str:
role_name = self._get_param("RoleName")
policy_name = self._get_param("PolicyName")
policy_name, policy_document = self.backend.get_role_policy(
@ -256,21 +256,21 @@ class IamResponse(BaseResponse):
policy_document=policy_document,
)
def update_assume_role_policy(self):
def update_assume_role_policy(self) -> str:
role_name = self._get_param("RoleName")
policy_document = self._get_param("PolicyDocument")
self.backend.update_assume_role_policy(role_name, policy_document)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="UpdateAssumeRolePolicy")
def update_role_description(self):
def update_role_description(self) -> str:
role_name = self._get_param("RoleName")
description = self._get_param("Description")
role = self.backend.update_role_description(role_name, description)
template = self.response_template(UPDATE_ROLE_DESCRIPTION_TEMPLATE)
return template.render(role=role)
def update_role(self):
def update_role(self) -> str:
role_name = self._get_param("RoleName")
description = self._get_param("Description")
max_session_duration = self._get_param("MaxSessionDuration", 3600)
@ -278,20 +278,20 @@ class IamResponse(BaseResponse):
template = self.response_template(UPDATE_ROLE_TEMPLATE)
return template.render(role=role)
def put_role_permissions_boundary(self):
def put_role_permissions_boundary(self) -> str:
permissions_boundary = self._get_param("PermissionsBoundary")
role_name = self._get_param("RoleName")
self.backend.put_role_permissions_boundary(role_name, permissions_boundary)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="PutRolePermissionsBoundary")
def delete_role_permissions_boundary(self):
def delete_role_permissions_boundary(self) -> str:
role_name = self._get_param("RoleName")
self.backend.delete_role_permissions_boundary(role_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteRolePermissionsBoundary")
def create_policy_version(self):
def create_policy_version(self) -> str:
policy_arn = self._get_param("PolicyArn")
policy_document = self._get_param("PolicyDocument")
set_as_default = self._get_param("SetAsDefault")
@ -301,21 +301,21 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_POLICY_VERSION_TEMPLATE)
return template.render(policy_version=policy_version)
def get_policy_version(self):
def get_policy_version(self) -> str:
policy_arn = self._get_param("PolicyArn")
version_id = self._get_param("VersionId")
policy_version = self.backend.get_policy_version(policy_arn, version_id)
template = self.response_template(GET_POLICY_VERSION_TEMPLATE)
return template.render(policy_version=policy_version)
def list_policy_versions(self):
def list_policy_versions(self) -> str:
policy_arn = self._get_param("PolicyArn")
policy_versions = self.backend.list_policy_versions(policy_arn)
template = self.response_template(LIST_POLICY_VERSIONS_TEMPLATE)
return template.render(policy_versions=policy_versions)
def list_policy_tags(self):
def list_policy_tags(self) -> str:
policy_arn = self._get_param("PolicyArn")
marker = self._get_param("Marker")
max_items = self._get_param("MaxItems", 100)
@ -325,7 +325,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_POLICY_TAG_TEMPLATE)
return template.render(tags=tags, marker=marker)
def tag_policy(self):
def tag_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
tags = self._get_multi_param("Tags.member")
@ -334,7 +334,7 @@ class IamResponse(BaseResponse):
template = self.response_template(TAG_POLICY_TEMPLATE)
return template.render()
def untag_policy(self):
def untag_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
tag_keys = self._get_multi_param("TagKeys.member")
@ -343,7 +343,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UNTAG_POLICY_TEMPLATE)
return template.render()
def delete_policy_version(self):
def delete_policy_version(self) -> str:
policy_arn = self._get_param("PolicyArn")
version_id = self._get_param("VersionId")
@ -351,7 +351,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeletePolicyVersion")
def create_instance_profile(self):
def create_instance_profile(self) -> str:
profile_name = self._get_param("InstanceProfileName")
path = self._get_param("Path", "/")
tags = self._get_multi_param("Tags.member")
@ -362,21 +362,21 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_INSTANCE_PROFILE_TEMPLATE)
return template.render(profile=profile)
def delete_instance_profile(self):
def delete_instance_profile(self) -> str:
profile_name = self._get_param("InstanceProfileName")
profile = self.backend.delete_instance_profile(profile_name)
self.backend.delete_instance_profile(profile_name)
template = self.response_template(DELETE_INSTANCE_PROFILE_TEMPLATE)
return template.render(profile=profile)
return template.render()
def get_instance_profile(self):
def get_instance_profile(self) -> str:
profile_name = self._get_param("InstanceProfileName")
profile = self.backend.get_instance_profile(profile_name)
template = self.response_template(GET_INSTANCE_PROFILE_TEMPLATE)
return template.render(profile=profile)
def add_role_to_instance_profile(self):
def add_role_to_instance_profile(self) -> str:
profile_name = self._get_param("InstanceProfileName")
role_name = self._get_param("RoleName")
@ -384,7 +384,7 @@ class IamResponse(BaseResponse):
template = self.response_template(ADD_ROLE_TO_INSTANCE_PROFILE_TEMPLATE)
return template.render()
def remove_role_from_instance_profile(self):
def remove_role_from_instance_profile(self) -> str:
profile_name = self._get_param("InstanceProfileName")
role_name = self._get_param("RoleName")
@ -392,7 +392,7 @@ class IamResponse(BaseResponse):
template = self.response_template(REMOVE_ROLE_FROM_INSTANCE_PROFILE_TEMPLATE)
return template.render()
def list_roles(self):
def list_roles(self) -> str:
path_prefix = self._get_param("PathPrefix", "/")
marker = self._get_param("Marker", "0")
max_items = self._get_param("MaxItems", 100)
@ -401,20 +401,20 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_ROLES_TEMPLATE)
return template.render(roles=roles, marker=marker)
def list_instance_profiles(self):
def list_instance_profiles(self) -> str:
profiles = self.backend.get_instance_profiles()
template = self.response_template(LIST_INSTANCE_PROFILES_TEMPLATE)
return template.render(instance_profiles=profiles)
def list_instance_profiles_for_role(self):
def list_instance_profiles_for_role(self) -> str:
role_name = self._get_param("RoleName")
profiles = self.backend.get_instance_profiles_for_role(role_name=role_name)
template = self.response_template(LIST_INSTANCE_PROFILES_FOR_ROLE_TEMPLATE)
return template.render(instance_profiles=profiles)
def upload_server_certificate(self):
def upload_server_certificate(self) -> str:
cert_name = self._get_param("ServerCertificateName")
cert_body = self._get_param("CertificateBody")
path = self._get_param("Path")
@ -427,24 +427,24 @@ class IamResponse(BaseResponse):
template = self.response_template(UPLOAD_CERT_TEMPLATE)
return template.render(certificate=cert)
def list_server_certificates(self):
def list_server_certificates(self) -> str:
certs = self.backend.list_server_certificates()
template = self.response_template(LIST_SERVER_CERTIFICATES_TEMPLATE)
return template.render(server_certificates=certs)
def get_server_certificate(self):
def get_server_certificate(self) -> str:
cert_name = self._get_param("ServerCertificateName")
cert = self.backend.get_server_certificate(cert_name)
template = self.response_template(GET_SERVER_CERTIFICATE_TEMPLATE)
return template.render(certificate=cert)
def delete_server_certificate(self):
def delete_server_certificate(self) -> str:
cert_name = self._get_param("ServerCertificateName")
self.backend.delete_server_certificate(cert_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteServerCertificate")
def create_group(self):
def create_group(self) -> str:
group_name = self._get_param("GroupName")
path = self._get_param("Path", "/")
@ -452,26 +452,26 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_GROUP_TEMPLATE)
return template.render(group=group)
def get_group(self):
def get_group(self) -> str:
group_name = self._get_param("GroupName")
group = self.backend.get_group(group_name)
template = self.response_template(GET_GROUP_TEMPLATE)
return template.render(group=group)
def list_groups(self):
def list_groups(self) -> str:
groups = self.backend.list_groups()
template = self.response_template(LIST_GROUPS_TEMPLATE)
return template.render(groups=groups)
def list_groups_for_user(self):
def list_groups_for_user(self) -> str:
user_name = self._get_param("UserName")
groups = self.backend.get_groups_for_user(user_name)
template = self.response_template(LIST_GROUPS_FOR_USER_TEMPLATE)
return template.render(groups=groups)
def put_group_policy(self):
def put_group_policy(self) -> str:
group_name = self._get_param("GroupName")
policy_name = self._get_param("PolicyName")
policy_document = self._get_param("PolicyDocument")
@ -479,7 +479,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="PutGroupPolicy")
def list_group_policies(self):
def list_group_policies(self) -> str:
group_name = self._get_param("GroupName")
marker = self._get_param("Marker")
policies = self.backend.list_group_policies(group_name)
@ -488,27 +488,27 @@ class IamResponse(BaseResponse):
name="ListGroupPoliciesResponse", policies=policies, marker=marker
)
def get_group_policy(self):
def get_group_policy(self) -> str:
group_name = self._get_param("GroupName")
policy_name = self._get_param("PolicyName")
policy_result = self.backend.get_group_policy(group_name, policy_name)
template = self.response_template(GET_GROUP_POLICY_TEMPLATE)
return template.render(name="GetGroupPolicyResponse", **policy_result)
def delete_group_policy(self):
def delete_group_policy(self) -> str:
group_name = self._get_param("GroupName")
policy_name = self._get_param("PolicyName")
self.backend.delete_group_policy(group_name, policy_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteGroupPolicy")
def delete_group(self):
def delete_group(self) -> str:
group_name = self._get_param("GroupName")
self.backend.delete_group(group_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteGroup")
def update_group(self):
def update_group(self) -> str:
group_name = self._get_param("GroupName")
new_group_name = self._get_param("NewGroupName")
new_path = self._get_param("NewPath")
@ -516,7 +516,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="UpdateGroup")
def create_user(self):
def create_user(self) -> str:
user_name = self._get_param("UserName")
path = self._get_param("Path")
tags = self._get_multi_param("Tags.member")
@ -524,7 +524,7 @@ class IamResponse(BaseResponse):
template = self.response_template(USER_TEMPLATE)
return template.render(action="Create", user=user, tags=user_tags["Tags"])
def get_user(self):
def get_user(self) -> str:
user_name = self._get_param("UserName")
if not user_name:
access_key_id = self.get_access_key()
@ -537,7 +537,7 @@ class IamResponse(BaseResponse):
template = self.response_template(USER_TEMPLATE)
return template.render(action="Get", user=user, tags=tags)
def list_users(self):
def list_users(self) -> str:
path_prefix = self._get_param("PathPrefix")
marker = self._get_param("Marker")
max_items = self._get_param("MaxItems")
@ -545,7 +545,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_USERS_TEMPLATE)
return template.render(action="List", users=users, isTruncated=False)
def update_user(self):
def update_user(self) -> str:
user_name = self._get_param("UserName")
new_path = self._get_param("NewPath")
new_user_name = self._get_param("NewUserName")
@ -557,7 +557,7 @@ class IamResponse(BaseResponse):
template = self.response_template(USER_TEMPLATE)
return template.render(action="Update", user=user)
def create_login_profile(self):
def create_login_profile(self) -> str:
user_name = self._get_param("UserName")
password = self._get_param("Password")
user = self.backend.create_login_profile(user_name, password)
@ -565,14 +565,14 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_LOGIN_PROFILE_TEMPLATE)
return template.render(user=user)
def get_login_profile(self):
def get_login_profile(self) -> str:
user_name = self._get_param("UserName")
user = self.backend.get_login_profile(user_name)
template = self.response_template(GET_LOGIN_PROFILE_TEMPLATE)
return template.render(user=user)
def update_login_profile(self):
def update_login_profile(self) -> str:
user_name = self._get_param("UserName")
password = self._get_param("Password")
password_reset_required = self._get_param("PasswordResetRequired")
@ -583,7 +583,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UPDATE_LOGIN_PROFILE_TEMPLATE)
return template.render(user=user)
def add_user_to_group(self):
def add_user_to_group(self) -> str:
group_name = self._get_param("GroupName")
user_name = self._get_param("UserName")
@ -591,7 +591,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="AddUserToGroup")
def remove_user_from_group(self):
def remove_user_from_group(self) -> str:
group_name = self._get_param("GroupName")
user_name = self._get_param("UserName")
@ -599,7 +599,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="RemoveUserFromGroup")
def get_user_policy(self):
def get_user_policy(self) -> str:
user_name = self._get_param("UserName")
policy_name = self._get_param("PolicyName")
@ -611,19 +611,19 @@ class IamResponse(BaseResponse):
policy_document=policy_document.get("policy_document"),
)
def list_user_policies(self):
def list_user_policies(self) -> str:
user_name = self._get_param("UserName")
policies = self.backend.list_user_policies(user_name)
template = self.response_template(LIST_USER_POLICIES_TEMPLATE)
return template.render(policies=policies)
def list_user_tags(self):
def list_user_tags(self) -> str:
user_name = self._get_param("UserName")
tags = self.backend.list_user_tags(user_name)
template = self.response_template(LIST_USER_TAGS_TEMPLATE)
return template.render(user_tags=tags["Tags"])
def put_user_policy(self):
def put_user_policy(self) -> str:
user_name = self._get_param("UserName")
policy_name = self._get_param("PolicyName")
policy_document = self._get_param("PolicyDocument")
@ -632,7 +632,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="PutUserPolicy")
def delete_user_policy(self):
def delete_user_policy(self) -> str:
user_name = self._get_param("UserName")
policy_name = self._get_param("PolicyName")
@ -640,7 +640,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteUserPolicy")
def create_access_key(self):
def create_access_key(self) -> str:
user_name = self._get_param("UserName")
if not user_name:
access_key_id = self.get_access_key()
@ -651,7 +651,7 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_ACCESS_KEY_TEMPLATE)
return template.render(key=key)
def update_access_key(self):
def update_access_key(self) -> str:
user_name = self._get_param("UserName")
access_key_id = self._get_param("AccessKeyId")
status = self._get_param("Status")
@ -663,7 +663,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="UpdateAccessKey")
def get_access_key_last_used(self):
def get_access_key_last_used(self) -> str:
access_key_id = self._get_param("AccessKeyId")
last_used_response = self.backend.get_access_key_last_used(access_key_id)
template = self.response_template(GET_ACCESS_KEY_LAST_USED_TEMPLATE)
@ -672,7 +672,7 @@ class IamResponse(BaseResponse):
last_used=last_used_response["last_used"],
)
def list_access_keys(self):
def list_access_keys(self) -> str:
user_name = self._get_param("UserName")
if not user_name:
access_key_id = self.get_access_key()
@ -683,7 +683,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_ACCESS_KEYS_TEMPLATE)
return template.render(user_name=user_name, keys=keys)
def delete_access_key(self):
def delete_access_key(self) -> str:
user_name = self._get_param("UserName")
access_key_id = self._get_param("AccessKeyId")
if not user_name:
@ -694,7 +694,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteAccessKey")
def upload_ssh_public_key(self):
def upload_ssh_public_key(self) -> str:
user_name = self._get_param("UserName")
ssh_public_key_body = self._get_param("SSHPublicKeyBody")
@ -702,7 +702,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UPLOAD_SSH_PUBLIC_KEY_TEMPLATE)
return template.render(key=key)
def get_ssh_public_key(self):
def get_ssh_public_key(self) -> str:
user_name = self._get_param("UserName")
ssh_public_key_id = self._get_param("SSHPublicKeyId")
@ -710,14 +710,14 @@ class IamResponse(BaseResponse):
template = self.response_template(GET_SSH_PUBLIC_KEY_TEMPLATE)
return template.render(key=key)
def list_ssh_public_keys(self):
def list_ssh_public_keys(self) -> str:
user_name = self._get_param("UserName")
keys = self.backend.get_all_ssh_public_keys(user_name)
template = self.response_template(LIST_SSH_PUBLIC_KEYS_TEMPLATE)
return template.render(keys=keys)
def update_ssh_public_key(self):
def update_ssh_public_key(self) -> str:
user_name = self._get_param("UserName")
ssh_public_key_id = self._get_param("SSHPublicKeyId")
status = self._get_param("Status")
@ -726,7 +726,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UPDATE_SSH_PUBLIC_KEY_TEMPLATE)
return template.render()
def delete_ssh_public_key(self):
def delete_ssh_public_key(self) -> str:
user_name = self._get_param("UserName")
ssh_public_key_id = self._get_param("SSHPublicKeyId")
@ -734,7 +734,7 @@ class IamResponse(BaseResponse):
template = self.response_template(DELETE_SSH_PUBLIC_KEY_TEMPLATE)
return template.render()
def deactivate_mfa_device(self):
def deactivate_mfa_device(self) -> str:
user_name = self._get_param("UserName")
serial_number = self._get_param("SerialNumber")
@ -742,7 +742,7 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeactivateMFADevice")
def enable_mfa_device(self):
def enable_mfa_device(self) -> str:
user_name = self._get_param("UserName")
serial_number = self._get_param("SerialNumber")
authentication_code_1 = self._get_param("AuthenticationCode1")
@ -754,13 +754,13 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="EnableMFADevice")
def list_mfa_devices(self):
def list_mfa_devices(self) -> str:
user_name = self._get_param("UserName")
devices = self.backend.list_mfa_devices(user_name)
template = self.response_template(LIST_MFA_DEVICES_TEMPLATE)
return template.render(user_name=user_name, devices=devices)
def create_virtual_mfa_device(self):
def create_virtual_mfa_device(self) -> str:
path = self._get_param("Path")
virtual_mfa_device_name = self._get_param("VirtualMFADeviceName")
@ -771,7 +771,7 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_VIRTUAL_MFA_DEVICE_TEMPLATE)
return template.render(device=virtual_mfa_device)
def delete_virtual_mfa_device(self):
def delete_virtual_mfa_device(self) -> str:
serial_number = self._get_param("SerialNumber")
self.backend.delete_virtual_mfa_device(serial_number)
@ -779,7 +779,7 @@ class IamResponse(BaseResponse):
template = self.response_template(DELETE_VIRTUAL_MFA_DEVICE_TEMPLATE)
return template.render()
def list_virtual_mfa_devices(self):
def list_virtual_mfa_devices(self) -> str:
assignment_status = self._get_param("AssignmentStatus", "Any")
marker = self._get_param("Marker")
max_items = self._get_param("MaxItems", 100)
@ -791,25 +791,25 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_VIRTUAL_MFA_DEVICES_TEMPLATE)
return template.render(devices=devices, marker=marker)
def delete_user(self):
def delete_user(self) -> str:
user_name = self._get_param("UserName")
self.backend.delete_user(user_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteUser")
def delete_policy(self):
def delete_policy(self) -> str:
policy_arn = self._get_param("PolicyArn")
self.backend.delete_policy(policy_arn)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeletePolicy")
def delete_login_profile(self):
def delete_login_profile(self) -> str:
user_name = self._get_param("UserName")
self.backend.delete_login_profile(user_name)
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteLoginProfile")
def generate_credential_report(self):
def generate_credential_report(self) -> str:
if self.backend.report_generated():
template = self.response_template(CREDENTIAL_REPORT_GENERATED)
else:
@ -817,28 +817,28 @@ class IamResponse(BaseResponse):
self.backend.generate_report()
return template.render()
def get_credential_report(self):
def get_credential_report(self) -> str:
report = self.backend.get_credential_report()
template = self.response_template(CREDENTIAL_REPORT)
return template.render(report=report)
def list_account_aliases(self):
def list_account_aliases(self) -> str:
aliases = self.backend.list_account_aliases()
template = self.response_template(LIST_ACCOUNT_ALIASES_TEMPLATE)
return template.render(aliases=aliases)
def create_account_alias(self):
def create_account_alias(self) -> str:
alias = self._get_param("AccountAlias")
self.backend.create_account_alias(alias)
template = self.response_template(CREATE_ACCOUNT_ALIAS_TEMPLATE)
return template.render()
def delete_account_alias(self):
def delete_account_alias(self) -> str:
self.backend.delete_account_alias()
template = self.response_template(DELETE_ACCOUNT_ALIAS_TEMPLATE)
return template.render()
def get_account_authorization_details(self):
def get_account_authorization_details(self) -> str:
filter_param = self._get_multi_param("Filter.member")
account_details = self.backend.get_account_authorization_details(filter_param)
template = self.response_template(GET_ACCOUNT_AUTHORIZATION_DETAILS_TEMPLATE)
@ -852,7 +852,7 @@ class IamResponse(BaseResponse):
list_tags_for_user=self.backend.list_user_tags,
)
def create_saml_provider(self):
def create_saml_provider(self) -> str:
saml_provider_name = self._get_param("Name")
saml_metadata_document = self._get_param("SAMLMetadataDocument")
saml_provider = self.backend.create_saml_provider(
@ -862,7 +862,7 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_SAML_PROVIDER_TEMPLATE)
return template.render(saml_provider=saml_provider)
def update_saml_provider(self):
def update_saml_provider(self) -> str:
saml_provider_arn = self._get_param("SAMLProviderArn")
saml_metadata_document = self._get_param("SAMLMetadataDocument")
saml_provider = self.backend.update_saml_provider(
@ -872,27 +872,27 @@ class IamResponse(BaseResponse):
template = self.response_template(UPDATE_SAML_PROVIDER_TEMPLATE)
return template.render(saml_provider=saml_provider)
def delete_saml_provider(self):
def delete_saml_provider(self) -> str:
saml_provider_arn = self._get_param("SAMLProviderArn")
self.backend.delete_saml_provider(saml_provider_arn)
template = self.response_template(DELETE_SAML_PROVIDER_TEMPLATE)
return template.render()
def list_saml_providers(self):
def list_saml_providers(self) -> str:
saml_providers = self.backend.list_saml_providers()
template = self.response_template(LIST_SAML_PROVIDERS_TEMPLATE)
return template.render(saml_providers=saml_providers)
def get_saml_provider(self):
def get_saml_provider(self) -> str:
saml_provider_arn = self._get_param("SAMLProviderArn")
saml_provider = self.backend.get_saml_provider(saml_provider_arn)
template = self.response_template(GET_SAML_PROVIDER_TEMPLATE)
return template.render(saml_provider=saml_provider)
def upload_signing_certificate(self):
def upload_signing_certificate(self) -> str:
user_name = self._get_param("UserName")
cert_body = self._get_param("CertificateBody")
@ -900,7 +900,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UPLOAD_SIGNING_CERTIFICATE_TEMPLATE)
return template.render(cert=cert)
def update_signing_certificate(self):
def update_signing_certificate(self) -> str:
user_name = self._get_param("UserName")
cert_id = self._get_param("CertificateId")
status = self._get_param("Status")
@ -909,7 +909,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UPDATE_SIGNING_CERTIFICATE_TEMPLATE)
return template.render()
def delete_signing_certificate(self):
def delete_signing_certificate(self) -> str:
user_name = self._get_param("UserName")
cert_id = self._get_param("CertificateId")
@ -917,14 +917,14 @@ class IamResponse(BaseResponse):
template = self.response_template(DELETE_SIGNING_CERTIFICATE_TEMPLATE)
return template.render()
def list_signing_certificates(self):
def list_signing_certificates(self) -> str:
user_name = self._get_param("UserName")
certs = self.backend.list_signing_certificates(user_name)
template = self.response_template(LIST_SIGNING_CERTIFICATES_TEMPLATE)
return template.render(user_name=user_name, certificates=certs)
def list_role_tags(self):
def list_role_tags(self) -> str:
role_name = self._get_param("RoleName")
marker = self._get_param("Marker")
max_items = self._get_param("MaxItems", 100)
@ -934,7 +934,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_ROLE_TAG_TEMPLATE)
return template.render(tags=tags, marker=marker)
def tag_role(self):
def tag_role(self) -> str:
role_name = self._get_param("RoleName")
tags = self._get_multi_param("Tags.member")
@ -943,7 +943,7 @@ class IamResponse(BaseResponse):
template = self.response_template(TAG_ROLE_TEMPLATE)
return template.render()
def untag_role(self):
def untag_role(self) -> str:
role_name = self._get_param("RoleName")
tag_keys = self._get_multi_param("TagKeys.member")
@ -952,7 +952,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UNTAG_ROLE_TEMPLATE)
return template.render()
def create_open_id_connect_provider(self):
def create_open_id_connect_provider(self) -> str:
open_id_provider_url = self._get_param("Url")
thumbprint_list = self._get_multi_param("ThumbprintList.member")
client_id_list = self._get_multi_param("ClientIDList.member")
@ -965,7 +965,7 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_OPEN_ID_CONNECT_PROVIDER_TEMPLATE)
return template.render(open_id_provider=open_id_provider)
def update_open_id_connect_provider_thumbprint(self):
def update_open_id_connect_provider_thumbprint(self) -> str:
open_id_provider_arn = self._get_param("OpenIDConnectProviderArn")
thumbprint_list = self._get_multi_param("ThumbprintList.member")
@ -976,7 +976,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UPDATE_OPEN_ID_CONNECT_PROVIDER_THUMBPRINT)
return template.render()
def tag_open_id_connect_provider(self):
def tag_open_id_connect_provider(self) -> str:
open_id_provider_arn = self._get_param("OpenIDConnectProviderArn")
tags = self._get_multi_param("Tags.member")
@ -985,7 +985,7 @@ class IamResponse(BaseResponse):
template = self.response_template(TAG_OPEN_ID_CONNECT_PROVIDER)
return template.render()
def untag_open_id_connect_provider(self):
def untag_open_id_connect_provider(self) -> str:
open_id_provider_arn = self._get_param("OpenIDConnectProviderArn")
tag_keys = self._get_multi_param("TagKeys.member")
@ -994,7 +994,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UNTAG_OPEN_ID_CONNECT_PROVIDER)
return template.render()
def list_open_id_connect_provider_tags(self):
def list_open_id_connect_provider_tags(self) -> str:
open_id_provider_arn = self._get_param("OpenIDConnectProviderArn")
marker = self._get_param("Marker")
max_items = self._get_param("MaxItems", 100)
@ -1004,7 +1004,7 @@ class IamResponse(BaseResponse):
template = self.response_template(LIST_OPEN_ID_CONNECT_PROVIDER_TAGS)
return template.render(tags=tags, marker=marker)
def delete_open_id_connect_provider(self):
def delete_open_id_connect_provider(self) -> str:
open_id_provider_arn = self._get_param("OpenIDConnectProviderArn")
self.backend.delete_open_id_connect_provider(open_id_provider_arn)
@ -1012,7 +1012,7 @@ class IamResponse(BaseResponse):
template = self.response_template(DELETE_OPEN_ID_CONNECT_PROVIDER_TEMPLATE)
return template.render()
def get_open_id_connect_provider(self):
def get_open_id_connect_provider(self) -> str:
open_id_provider_arn = self._get_param("OpenIDConnectProviderArn")
open_id_provider = self.backend.get_open_id_connect_provider(
@ -1022,13 +1022,13 @@ class IamResponse(BaseResponse):
template = self.response_template(GET_OPEN_ID_CONNECT_PROVIDER_TEMPLATE)
return template.render(open_id_provider=open_id_provider)
def list_open_id_connect_providers(self):
def list_open_id_connect_providers(self) -> str:
open_id_provider_arns = self.backend.list_open_id_connect_providers()
template = self.response_template(LIST_OPEN_ID_CONNECT_PROVIDERS_TEMPLATE)
return template.render(open_id_provider_arns=open_id_provider_arns)
def update_account_password_policy(self):
def update_account_password_policy(self) -> str:
allow_change_password = self._get_bool_param(
"AllowUsersToChangePassword", False
)
@ -1060,25 +1060,25 @@ class IamResponse(BaseResponse):
template = self.response_template(UPDATE_ACCOUNT_PASSWORD_POLICY_TEMPLATE)
return template.render()
def get_account_password_policy(self):
def get_account_password_policy(self) -> str:
account_password_policy = self.backend.get_account_password_policy()
template = self.response_template(GET_ACCOUNT_PASSWORD_POLICY_TEMPLATE)
return template.render(password_policy=account_password_policy)
def delete_account_password_policy(self):
def delete_account_password_policy(self) -> str:
self.backend.delete_account_password_policy()
template = self.response_template(DELETE_ACCOUNT_PASSWORD_POLICY_TEMPLATE)
return template.render()
def get_account_summary(self):
def get_account_summary(self) -> str:
account_summary = self.backend.get_account_summary()
template = self.response_template(GET_ACCOUNT_SUMMARY_TEMPLATE)
return template.render(summary_map=account_summary.summary_map)
def tag_user(self):
def tag_user(self) -> str:
name = self._get_param("UserName")
tags = self._get_multi_param("Tags.member")
@ -1087,7 +1087,7 @@ class IamResponse(BaseResponse):
template = self.response_template(TAG_USER_TEMPLATE)
return template.render()
def untag_user(self):
def untag_user(self) -> str:
name = self._get_param("UserName")
tag_keys = self._get_multi_param("TagKeys.member")
@ -1096,7 +1096,7 @@ class IamResponse(BaseResponse):
template = self.response_template(UNTAG_USER_TEMPLATE)
return template.render()
def create_service_linked_role(self):
def create_service_linked_role(self) -> str:
service_name = self._get_param("AWSServiceName")
description = self._get_param("Description")
suffix = self._get_param("CustomSuffix")
@ -1108,7 +1108,7 @@ class IamResponse(BaseResponse):
template = self.response_template(CREATE_SERVICE_LINKED_ROLE_TEMPLATE)
return template.render(role=role)
def delete_service_linked_role(self):
def delete_service_linked_role(self) -> str:
role_name = self._get_param("RoleName")
deletion_task_id = self.backend.delete_service_linked_role(role_name)
@ -1116,7 +1116,7 @@ class IamResponse(BaseResponse):
template = self.response_template(DELETE_SERVICE_LINKED_ROLE_TEMPLATE)
return template.render(deletion_task_id=deletion_task_id)
def get_service_linked_role_deletion_status(self):
def get_service_linked_role_deletion_status(self) -> str:
self.backend.get_service_linked_role_deletion_status()
template = self.response_template(

View File

@ -6,13 +6,13 @@ AWS_ROLE_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
ACCOUNT_OFFSET = 549755813888 # int.from_bytes(base64.b32decode(b"QAAAAAAA"), byteorder="big"), start value
def _random_uppercase_or_digit_sequence(length):
def _random_uppercase_or_digit_sequence(length: int) -> str:
return "".join(str(random.choice(AWS_ROLE_ALPHABET)) for _ in range(length))
def generate_access_key_id_from_account_id(
account_id: str, prefix: str, total_length: int = 20
):
) -> str:
"""
Generates a key id (e.g. access key id) for the given account id and prefix
@ -21,13 +21,13 @@ def generate_access_key_id_from_account_id(
:param total_length: Total length of the access key (e.g. 20 for temp access keys, 21 for role ids)
:return: Generated id
"""
account_id = int(account_id)
id_with_offset = account_id // 2 + ACCOUNT_OFFSET
account_id_nr = int(account_id)
id_with_offset = account_id_nr // 2 + ACCOUNT_OFFSET
account_bytes = int.to_bytes(id_with_offset, byteorder="big", length=5)
account_part = base64.b32encode(account_bytes).decode("utf-8")
middle_char = (
random.choice(AWS_ROLE_ALPHABET[16:])
if account_id % 2
if account_id_nr % 2
else random.choice(AWS_ROLE_ALPHABET[:16])
)
semi_fixed_part = prefix + account_part + middle_char
@ -36,14 +36,14 @@ def generate_access_key_id_from_account_id(
)
def random_alphanumeric(length):
def random_alphanumeric(length: int) -> str:
return "".join(
str(random.choice(string.ascii_letters + string.digits + "+" + "/"))
for _ in range(length)
)
def random_resource_id(size=20):
def random_resource_id(size: int = 20) -> str:
chars = list(range(10)) + list(string.ascii_lowercase)
return "".join(str(random.choice(chars)) for x in range(size))
@ -55,13 +55,13 @@ def random_role_id(account_id: str) -> str:
)
def random_access_key():
def random_access_key() -> str:
return "".join(
str(random.choice(string.ascii_uppercase + string.digits)) for _ in range(16)
)
def random_policy_id():
def random_policy_id() -> str:
return "A" + "".join(
random.choice(string.ascii_uppercase + string.digits) for _ in range(20)
)

View File

@ -55,12 +55,12 @@ class InvalidArgumentError(S3ClientError):
class AccessForbidden(S3ClientError):
code = 403
def __init__(self, msg):
def __init__(self, msg: str):
super().__init__("AccessForbidden", msg)
class BucketError(S3ClientError):
def __init__(self, *args, **kwargs):
def __init__(self, *args: str, **kwargs: str):
kwargs.setdefault("template", "bucket_error")
self.templates["bucket_error"] = ERROR_WITH_BUCKET_NAME
super().__init__(*args, **kwargs)
@ -335,45 +335,40 @@ class DuplicateTagKeys(S3ClientError):
class S3AccessDeniedError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
def __init__(self, *args: str, **kwargs: str):
super().__init__("AccessDenied", "Access Denied", *args, **kwargs)
class BucketAccessDeniedError(BucketError):
code = 403
def __init__(self, *args, **kwargs):
def __init__(self, *args: str, **kwargs: str):
super().__init__("AccessDenied", "Access Denied", *args, **kwargs)
class S3InvalidTokenError(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
def __init__(self) -> None:
super().__init__(
"InvalidToken",
"The provided token is malformed or otherwise invalid.",
*args,
**kwargs,
"InvalidToken", "The provided token is malformed or otherwise invalid."
)
class S3AclAndGrantError(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
def __init__(self) -> None:
super().__init__(
"InvalidRequest",
"Specifying both Canned ACLs and Header Grants is not allowed",
*args,
**kwargs,
)
class BucketInvalidTokenError(BucketError):
code = 400
def __init__(self, *args, **kwargs):
def __init__(self, *args: str, **kwargs: str):
super().__init__(
"InvalidToken",
"The provided token is malformed or otherwise invalid.",
@ -385,19 +380,17 @@ class BucketInvalidTokenError(BucketError):
class S3InvalidAccessKeyIdError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
def __init__(self) -> None:
super().__init__(
"InvalidAccessKeyId",
"The AWS Access Key Id you provided does not exist in our records.",
*args,
**kwargs,
)
class BucketInvalidAccessKeyIdError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
def __init__(self, *args: str, **kwargs: str):
super().__init__(
"InvalidAccessKeyId",
"The AWS Access Key Id you provided does not exist in our records.",
@ -409,19 +402,17 @@ class BucketInvalidAccessKeyIdError(S3ClientError):
class S3SignatureDoesNotMatchError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
def __init__(self) -> None:
super().__init__(
"SignatureDoesNotMatch",
"The request signature we calculated does not match the signature you provided. Check your key and signing method.",
*args,
**kwargs,
)
class BucketSignatureDoesNotMatchError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
def __init__(self, *args: str, **kwargs: str):
super().__init__(
"SignatureDoesNotMatch",
"The request signature we calculated does not match the signature you provided. Check your key and signing method.",

View File

@ -10,7 +10,6 @@ from moto.sts.utils import (
DEFAULT_STS_SESSION_DURATION,
random_assumed_role_id,
)
from typing import Mapping
class Token(BaseModel):
@ -185,6 +184,6 @@ class STSBackend(BaseBackend):
return account_id, iam_backend.create_temp_access_key()
sts_backends: Mapping[str, STSBackend] = BackendDict(
sts_backends = BackendDict(
STSBackend, "sts", use_boto3_regions=False, additional_regions=["global"]
)

View File

@ -230,7 +230,7 @@ disable = W,C,R,E
enable = anomalous-backslash-in-string, arguments-renamed, dangerous-default-value, deprecated-module, function-redefined, import-self, redefined-builtin, redefined-outer-name, reimported, pointless-statement, super-with-arguments, unused-argument, unused-import, unused-variable, useless-else-on-loop, wildcard-import
[mypy]
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/moto_api,moto/neptune
files= moto/a*,moto/b*,moto/c*,moto/d*,moto/e*,moto/f*,moto/g*,moto/iam,moto/moto_api,moto/neptune
show_column_numbers=True
show_error_codes = True
disable_error_code=abstract