moto/moto/iam/access_control.py

455 lines
17 KiB
Python

"""
This implementation is NOT complete, there are many things to improve.
The following is a list of the most important missing features and inaccuracies.
TODO add support for more principals, apart from IAM users and assumed IAM roles
TODO add support for the Resource and Condition parts of IAM policies
TODO add support and create tests for all services in moto (for example, API Gateway is probably not supported currently)
TODO implement service specific error messages (currently, EC2 and S3 are supported separately, everything else defaults to the errors IAM returns)
TODO include information about the action's resource in error messages (once the Resource element in IAM policies is supported)
TODO check all other actions that are performed by the action called by the user (for example, autoscaling:CreateAutoScalingGroup requires permission for iam:CreateServiceLinkedRole too - see https://docs.aws.amazon.com/autoscaling/ec2/userguide/control-access-using-iam.html)
TODO add support for resource-based policies
"""
import json
import logging
import re
from abc import abstractmethod, ABCMeta
from enum import Enum
from typing import Any, Dict, Optional, Match, List, Union
from botocore.auth import SigV4Auth, S3SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from moto.core.exceptions import (
SignatureDoesNotMatchError,
AccessDeniedError,
InvalidClientTokenIdError,
AuthFailureError,
)
from moto.s3.exceptions import (
BucketAccessDeniedError,
S3AccessDeniedError,
BucketInvalidTokenError,
S3InvalidTokenError,
S3InvalidAccessKeyIdError,
BucketInvalidAccessKeyIdError,
BucketSignatureDoesNotMatchError,
S3SignatureDoesNotMatchError,
)
from moto.sts.models import sts_backends
from .models import iam_backends, Policy, IAMBackend
log = logging.getLogger(__name__)
def create_access_key(
account_id: str, access_key_id: str, headers: Dict[str, str]
) -> Union["IAMUserAccessKey", "AssumedRoleAccessKey"]:
if access_key_id.startswith("AKIA") or "X-Amz-Security-Token" not in headers:
return IAMUserAccessKey(account_id, access_key_id, headers)
else:
return AssumedRoleAccessKey(account_id, access_key_id, headers)
class IAMUserAccessKey:
@property
def backend(self) -> IAMBackend:
return iam_backends[self.account_id]["global"]
def __init__(self, account_id: str, access_key_id: str, headers: Dict[str, str]):
self.account_id = account_id
iam_users = self.backend.list_users("/", None, None)
for iam_user in iam_users:
for access_key in iam_user.access_keys:
if access_key.access_key_id == access_key_id:
self._owner_user_name = iam_user.name
self._access_key_id = access_key_id
self._secret_access_key = access_key.secret_access_key
if "X-Amz-Security-Token" in headers:
raise CreateAccessKeyFailure(reason="InvalidToken")
return
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self) -> str:
return f"arn:aws:iam::{self.account_id}:user/{self._owner_user_name}"
def create_credentials(self) -> Credentials:
return Credentials(self._access_key_id, self._secret_access_key)
def collect_policies(self) -> List[Dict[str, str]]:
user_policies = []
inline_policy_names = self.backend.list_user_policies(self._owner_user_name)
for inline_policy_name in inline_policy_names:
inline_policy = self.backend.get_user_policy(
self._owner_user_name, inline_policy_name
)
user_policies.append(inline_policy)
attached_policies, _ = self.backend.list_attached_user_policies(
self._owner_user_name
)
user_policies += attached_policies
user_groups = self.backend.get_groups_for_user(self._owner_user_name)
for user_group in user_groups:
inline_group_policy_names = self.backend.list_group_policies(
user_group.name
)
for inline_group_policy_name in inline_group_policy_names:
inline_user_group_policy = self.backend.get_group_policy(
user_group.name, inline_group_policy_name
)
user_policies.append(inline_user_group_policy)
attached_group_policies, _ = self.backend.list_attached_group_policies(
user_group.name
)
user_policies += attached_group_policies
return user_policies
class AssumedRoleAccessKey:
@property
def backend(self) -> IAMBackend:
return iam_backends[self.account_id]["global"]
def __init__(self, account_id: str, access_key_id: str, headers: Dict[str, str]):
self.account_id = account_id
for assumed_role in sts_backends[account_id]["global"].assumed_roles:
if assumed_role.access_key_id == access_key_id:
self._access_key_id = access_key_id
self._secret_access_key = assumed_role.secret_access_key
self._session_token = assumed_role.session_token
self._owner_role_name = assumed_role.role_arn.split("/")[-1]
self._session_name = assumed_role.session_name
if headers["X-Amz-Security-Token"] != self._session_token:
raise CreateAccessKeyFailure(reason="InvalidToken")
return
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self) -> str:
return f"arn:aws:sts::{self.account_id}:assumed-role/{self._owner_role_name}/{self._session_name}"
def create_credentials(self) -> Credentials:
return Credentials(
self._access_key_id, self._secret_access_key, self._session_token
)
def collect_policies(self) -> List[str]:
role_policies = []
inline_policy_names = self.backend.list_role_policies(self._owner_role_name)
for inline_policy_name in inline_policy_names:
_, inline_policy = self.backend.get_role_policy(
self._owner_role_name, inline_policy_name
)
role_policies.append(inline_policy)
attached_policies, _ = self.backend.list_attached_role_policies(
self._owner_role_name
)
role_policies += attached_policies # type: ignore[arg-type]
return role_policies
class CreateAccessKeyFailure(Exception):
def __init__(self, reason: str):
super().__init__()
self.reason = reason
class IAMRequestBase(object, metaclass=ABCMeta):
def __init__(
self,
account_id: str,
method: str,
path: str,
data: Dict[str, str],
body: bytes,
headers: Dict[str, str],
):
log.debug(
f"Creating {self.__class__.__name__} with method={method}, path={path}, data={data}, headers={headers}"
)
self.account_id = account_id
self._method = method
self._path = path
self._data = data
self._body = body
self._headers = headers
credential_scope = self._get_string_between(
"Credential=", ",", self._headers["Authorization"]
)
credential_data = credential_scope.split("/")
self._region = credential_data[2]
self._service = credential_data[3]
action_from_request = self._action_from_request()
self._action = (
self._service
+ ":"
+ (
action_from_request[0]
if isinstance(action_from_request, list)
else action_from_request
)
)
try:
self._access_key = create_access_key(
account_id=self.account_id,
access_key_id=credential_data[0],
headers=headers,
)
except CreateAccessKeyFailure as e:
self._raise_invalid_access_key(e.reason)
def _action_from_request(self) -> str:
if "X-Amz-Target" in self._headers:
return self._headers["X-Amz-Target"].split(".")[-1]
return self._data["Action"]
def check_signature(self) -> None:
original_signature = self._get_string_between(
"Signature=", ",", self._headers["Authorization"]
)
calculated_signature = self._calculate_signature()
if original_signature != calculated_signature:
self._raise_signature_does_not_match()
def check_action_permitted(self, resource: str) -> None:
if (
self._action == "sts:GetCallerIdentity"
): # always allowed, even if there's an explicit Deny for it
return
policies = self._access_key.collect_policies()
permitted = False
for policy in policies:
iam_policy = IAMPolicy(policy)
permission_result = iam_policy.is_action_permitted(self._action, resource)
if permission_result == PermissionResult.DENIED:
self._raise_access_denied()
elif permission_result == PermissionResult.PERMITTED:
permitted = True
if not permitted:
self._raise_access_denied()
@abstractmethod
def _raise_signature_does_not_match(self) -> None:
raise NotImplementedError()
@abstractmethod
def _raise_access_denied(self) -> None:
raise NotImplementedError()
@abstractmethod
def _raise_invalid_access_key(self, reason: str) -> None:
raise NotImplementedError()
@abstractmethod
def _create_auth(self, credentials: Credentials) -> SigV4Auth: # type: ignore[misc]
raise NotImplementedError()
@staticmethod
def _create_headers_for_aws_request(
signed_headers: List[str], original_headers: Dict[str, str]
) -> Dict[str, str]:
headers = {}
for key, value in original_headers.items():
if key.lower() in signed_headers:
headers[key] = value
return headers
def _create_aws_request(self) -> AWSRequest:
signed_headers = self._get_string_between(
"SignedHeaders=", ",", self._headers["Authorization"]
).split(";")
headers = self._create_headers_for_aws_request(signed_headers, self._headers)
request = AWSRequest(
method=self._method,
url=self._path,
data=self._body or self._data,
headers=headers,
)
request.context["timestamp"] = headers["X-Amz-Date"]
return request
def _calculate_signature(self) -> str:
credentials = self._access_key.create_credentials()
auth = self._create_auth(credentials)
request = self._create_aws_request()
canonical_request = auth.canonical_request(request)
string_to_sign = auth.string_to_sign(request, canonical_request)
return auth.signature(string_to_sign, request)
@staticmethod
def _get_string_between(
first_separator: str, second_separator: str, string: str
) -> str:
return string.partition(first_separator)[2].partition(second_separator)[0]
class IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self) -> None:
if self._service == "ec2":
raise AuthFailureError()
else:
raise SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, _: str) -> None:
if self._service == "ec2":
raise AuthFailureError()
else:
raise InvalidClientTokenIdError()
def _create_auth(self, credentials: Any) -> SigV4Auth:
return SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self) -> None:
raise AccessDeniedError(user_arn=self._access_key.arn, action=self._action)
class S3IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self) -> None:
if "BucketName" in self._data:
raise BucketSignatureDoesNotMatchError(bucket=self._data["BucketName"])
else:
raise S3SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, reason: str) -> None:
if reason == "InvalidToken":
if "BucketName" in self._data:
raise BucketInvalidTokenError(bucket=self._data["BucketName"])
else:
raise S3InvalidTokenError()
else:
if "BucketName" in self._data:
raise BucketInvalidAccessKeyIdError(bucket=self._data["BucketName"])
else:
raise S3InvalidAccessKeyIdError()
def _create_auth(self, credentials: Any) -> S3SigV4Auth:
return S3SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self) -> None:
if "BucketName" in self._data:
raise BucketAccessDeniedError(bucket=self._data["BucketName"])
else:
raise S3AccessDeniedError()
class IAMPolicy:
def __init__(self, policy: Any):
if isinstance(policy, Policy):
default_version = next(
policy_version
for policy_version in policy.versions
if policy_version.is_default
)
policy_document = default_version.document
elif isinstance(policy, str):
policy_document = policy
else:
policy_document = policy["policy_document"]
self._policy_json = json.loads(policy_document)
def is_action_permitted(
self, action: str, resource: str = "*"
) -> "PermissionResult":
permitted = False
if isinstance(self._policy_json["Statement"], list):
for policy_statement in self._policy_json["Statement"]:
iam_policy_statement = IAMPolicyStatement(policy_statement)
permission_result = iam_policy_statement.is_action_permitted(
action, resource
)
if permission_result == PermissionResult.DENIED:
return permission_result
elif permission_result == PermissionResult.PERMITTED:
permitted = True
else: # dict
iam_policy_statement = IAMPolicyStatement(self._policy_json["Statement"])
return iam_policy_statement.is_action_permitted(action, resource)
if permitted:
return PermissionResult.PERMITTED
else:
return PermissionResult.NEUTRAL
class IAMPolicyStatement:
def __init__(self, statement: Any):
self._statement = statement
def is_action_permitted(
self, action: str, resource: str = "*"
) -> "PermissionResult":
is_action_concerned = False
if "NotAction" in self._statement:
if not self._check_element_matches("NotAction", action):
is_action_concerned = True
else: # Action is present
if self._check_element_matches("Action", action):
is_action_concerned = True
if is_action_concerned:
if self.is_unknown_principal(self._statement.get("Principal")):
return PermissionResult.NEUTRAL
same_resource = self._check_element_matches("Resource", resource)
if not same_resource:
return PermissionResult.NEUTRAL
if self._statement["Effect"] == "Allow" and same_resource:
return PermissionResult.PERMITTED
else: # Deny
return PermissionResult.DENIED
else:
return PermissionResult.NEUTRAL
def is_unknown_principal(self, principal: Optional[str]) -> bool:
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-bucket-user-policy-specifying-principal-intro.html
# For now, Moto only verifies principal == *
# 'Unknown' principals are not verified
#
# This should be extended to check:
# - Can the principal be empty? How behaves AWS?
# - allow one/multiple account ARN's
# - allow one/multiple rules
if principal is None:
return False
if isinstance(principal, str) and principal != "*":
return True
return False
def _check_element_matches(self, statement_element: Any, value: str) -> bool:
if isinstance(self._statement[statement_element], list):
for statement_element_value in self._statement[statement_element]:
if self._match(statement_element_value, value):
return True
return False
else: # string
return self._match(self._statement[statement_element], value) is not None
@staticmethod
def _match(pattern: str, string: str) -> Optional[Match[str]]:
pattern = pattern.replace("*", ".*")
pattern = f"^{pattern}$"
return re.match(pattern, string)
class PermissionResult(Enum):
PERMITTED = 1
DENIED = 2
NEUTRAL = 3