From 9d8c11fdc8570b488f2b3f908f37debebb8db92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Joga?= Date: Thu, 14 Sep 2023 14:48:20 +0200 Subject: [PATCH] IAM validation: Resource is now processed during authorization (#6799) --- moto/core/responses.py | 10 +++++-- moto/iam/access_control.py | 4 ++- moto/sts/responses.py | 5 ++++ tests/test_core/test_auth.py | 41 ++++++++++++++++++++++++++ tests/test_s3/test_s3_bucket_policy.py | 24 +++++++++++---- 5 files changed, 75 insertions(+), 9 deletions(-) diff --git a/moto/core/responses.py b/moto/core/responses.py index 67c2cb6cd..42b852035 100644 --- a/moto/core/responses.py +++ b/moto/core/responses.py @@ -157,10 +157,10 @@ class ActionAuthenticatorMixin(object): else: ActionAuthenticatorMixin.request_count += 1 - def _authenticate_and_authorize_normal_action(self) -> None: + def _authenticate_and_authorize_normal_action(self, resource: str = "*") -> None: from moto.iam.access_control import IAMRequest - self._authenticate_and_authorize_action(IAMRequest) + self._authenticate_and_authorize_action(IAMRequest, resource) def _authenticate_and_authorize_s3_action( self, bucket_name: Optional[str] = None, key_name: Optional[str] = None @@ -491,9 +491,13 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin): def call_action(self) -> TYPE_RESPONSE: headers = self.response_headers + if hasattr(self, "_determine_resource"): + resource = self._determine_resource() + else: + resource = "*" try: - self._authenticate_and_authorize_normal_action() + self._authenticate_and_authorize_normal_action(resource) except HTTPException as http_error: response = http_error.description, dict(status=http_error.code) return self._send_response(headers, response) diff --git a/moto/iam/access_control.py b/moto/iam/access_control.py index 70b506260..2627a4b7a 100644 --- a/moto/iam/access_control.py +++ b/moto/iam/access_control.py @@ -380,7 +380,7 @@ class IAMPolicy: permitted = True else: # dict iam_policy_statement = IAMPolicyStatement(self._policy_json["Statement"]) - return iam_policy_statement.is_action_permitted(action) + return iam_policy_statement.is_action_permitted(action, resource) if permitted: return PermissionResult.PERMITTED @@ -408,6 +408,8 @@ class IAMPolicyStatement: if self.is_unknown_principal(self._statement.get("Principal")): return PermissionResult.NEUTRAL same_resource = self._check_element_matches("Resource", resource) + if not same_resource: + return PermissionResult.NEUTRAL if self._statement["Effect"] == "Allow" and same_resource: return PermissionResult.PERMITTED else: # Deny diff --git a/moto/sts/responses.py b/moto/sts/responses.py index c54481652..7e95e6535 100644 --- a/moto/sts/responses.py +++ b/moto/sts/responses.py @@ -13,6 +13,11 @@ class TokenResponse(BaseResponse): def backend(self) -> STSBackend: return sts_backends[self.current_account]["global"] + def _determine_resource(self) -> str: + if "AssumeRole" in self.querystring.get("Action", []): + return self.querystring.get("RoleArn")[0] # type: ignore[index] + return "*" + def get_session_token(self) -> str: duration = int(self.querystring.get("DurationSeconds", [43200])[0]) token = self.backend.get_session_token(duration=duration) diff --git a/tests/test_core/test_auth.py b/tests/test_core/test_auth.py index b2a051112..8aceb2cf5 100644 --- a/tests/test_core/test_auth.py +++ b/tests/test_core/test_auth.py @@ -291,6 +291,47 @@ def test_access_denied_with_not_allowing_policy(): ) +@set_initial_no_auth_action_count(3) +@mock_sts +def test_access_denied_explicitly_on_specific_resource(): + user_name = "test-user" + forbidden_role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/forbidden_explicitly" + allowed_role_arn = f"arn:aws:iam::{ACCOUNT_ID}:role/allowed_implictly" + role_session_name = "dummy" + inline_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Deny", + "Action": ["sts:AssumeRole"], + "Resource": forbidden_role_arn, + }, + {"Effect": "Allow", "Action": ["sts:AssumeRole"], "Resource": "*"}, + ], + } + access_key = create_user_with_access_key_and_inline_policy( + user_name, inline_policy_document + ) + client = boto3.client( + "sts", + region_name="us-east-1", + aws_access_key_id=access_key["AccessKeyId"], + aws_secret_access_key=access_key["SecretAccessKey"], + ) + with pytest.raises(ClientError) as ex: + client.assume_role( + RoleArn=forbidden_role_arn, RoleSessionName=role_session_name + ) + assert ex.value.response["Error"]["Code"] == "AccessDenied" + assert ex.value.response["ResponseMetadata"]["HTTPStatusCode"] == 403 + assert ( + ex.value.response["Error"]["Message"] + == f"User: arn:aws:iam::{ACCOUNT_ID}:user/{user_name} is not authorized to perform: sts:AssumeRole" + ) + # Not raising means success + client.assume_role(RoleArn=allowed_role_arn, RoleSessionName=role_session_name) + + @set_initial_no_auth_action_count(3) @mock_ec2 def test_access_denied_for_run_instances(): diff --git a/tests/test_s3/test_s3_bucket_policy.py b/tests/test_s3/test_s3_bucket_policy.py index 1b7bbea62..8a910c61c 100644 --- a/tests/test_s3/test_s3_bucket_policy.py +++ b/tests/test_s3/test_s3_bucket_policy.py @@ -34,15 +34,25 @@ class TestBucketPolicy: def teardown_class(cls): cls.server.stop() + xfail_reason = "S3 logic for resource-based policy is not yet correctly implemented, see https://github.com/getmoto/moto/pull/6799#issuecomment-1712799688" + @pytest.mark.parametrize( "kwargs,status", [ ({}, 200), ({"resource": "arn:aws:s3:::mybucket/test_txt"}, 200), - ({"resource": "arn:aws:s3:::notmybucket/*"}, 403), - ({"resource": "arn:aws:s3:::mybucket/other*"}, 403), + pytest.param( + {"resource": "arn:aws:s3:::notmybucket/*"}, + 403, + marks=pytest.mark.xfail(reason=xfail_reason), + ), + pytest.param( + {"resource": "arn:aws:s3:::mybucket/other*"}, + 403, + marks=pytest.mark.xfail(reason=xfail_reason), + ), ({"resource": ["arn:aws:s3:::mybucket", "arn:aws:s3:::mybucket/*"]}, 200), - ( + pytest.param( { "resource": [ "arn:aws:s3:::notmybucket", @@ -50,12 +60,16 @@ class TestBucketPolicy: ] }, 403, + marks=pytest.mark.xfail(reason=xfail_reason), ), - ( + pytest.param( {"resource": ["arn:aws:s3:::mybucket", "arn:aws:s3:::notmybucket/*"]}, 403, + marks=pytest.mark.xfail(reason=xfail_reason), + ), + pytest.param( + {"effect": "Deny"}, 403, marks=pytest.mark.xfail(reason=xfail_reason) ), - ({"effect": "Deny"}, 403), ], ) def test_block_or_allow_get_object(self, kwargs, status):