Merge pull request #2300 from acsbendi/master

Basic IAM authentication, authorization
This commit is contained in:
Steve Pulec 2019-07-28 17:22:52 -05:00 committed by GitHub
commit 69d86cbd54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1460 additions and 50 deletions

View File

@ -255,6 +255,47 @@ def test_my_model_save():
mock.stop()
```
## IAM-like Access Control
Moto also has the ability to authenticate and authorize actions, just like it's done by IAM in AWS. This functionality can be enabled by either setting the `INITIAL_NO_AUTH_ACTION_COUNT` environment variable or using the `set_initial_no_auth_action_count` decorator. Note that the current implementation is very basic, see [this file](https://github.com/spulec/moto/blob/master/moto/core/access_control.py) for more information.
### `INITIAL_NO_AUTH_ACTION_COUNT`
If this environment variable is set, moto will skip performing any authentication as many times as the variable's value, and only starts authenticating requests afterwards. If it is not set, it defaults to infinity, thus moto will never perform any authentication at all.
### `set_initial_no_auth_action_count`
This is a decorator that works similarly to the environment variable, but the settings are only valid in the function's scope. When the function returns, everything is restored.
```python
@set_initial_no_auth_action_count(4)
@mock_ec2
def test_describe_instances_allowed():
policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:Describe*",
"Resource": "*"
}
]
}
access_key = ...
# create access key for an IAM user/assumed role that has the policy above.
# this part should call __exactly__ 4 AWS actions, so that authentication and authorization starts exactly after this
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
# if the IAM principal whose access key is used, does not have the permission to describe instances, this will fail
instances = client.describe_instances()['Reservations'][0]['Instances']
assert len(instances) == 0
```
See [the related test suite](https://github.com/spulec/moto/blob/master/tests/test_core/test_auth.py) for more examples.
## Stand-alone Server Mode
Moto also has a stand-alone server mode. This allows you to utilize

View File

@ -1,4 +1,7 @@
from __future__ import unicode_literals
from .models import BaseModel, BaseBackend, moto_api_backend # flake8: noqa
from .responses import ActionAuthenticatorMixin
moto_api_backends = {"global": moto_api_backend}
set_initial_no_auth_action_count = ActionAuthenticatorMixin.set_initial_no_auth_action_count

363
moto/core/access_control.py Normal file
View File

@ -0,0 +1,363 @@
"""
This implementation is NOT complete, there are many things to improve.
The following is a list of the most important missing features and inaccuracies.
TODO add support for more principals, apart from IAM users and assumed IAM roles
TODO add support for the Resource and Condition parts of IAM policies
TODO add support and create tests for all services in moto (for example, API Gateway is probably not supported currently)
TODO implement service specific error messages (currently, EC2 and S3 are supported separately, everything else defaults to the errors IAM returns)
TODO include information about the action's resource in error messages (once the Resource element in IAM policies is supported)
TODO check all other actions that are performed by the action called by the user (for example, autoscaling:CreateAutoScalingGroup requires permission for iam:CreateServiceLinkedRole too - see https://docs.aws.amazon.com/autoscaling/ec2/userguide/control-access-using-iam.html)
TODO add support for resource-based policies
"""
import json
import logging
import re
from abc import abstractmethod, ABCMeta
from enum import Enum
import six
from botocore.auth import SigV4Auth, S3SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from six import string_types
from moto.iam.models import ACCOUNT_ID, Policy
from moto.iam import iam_backend
from moto.core.exceptions import SignatureDoesNotMatchError, AccessDeniedError, InvalidClientTokenIdError, AuthFailureError
from moto.s3.exceptions import (
BucketAccessDeniedError,
S3AccessDeniedError,
BucketInvalidTokenError,
S3InvalidTokenError,
S3InvalidAccessKeyIdError,
BucketInvalidAccessKeyIdError,
BucketSignatureDoesNotMatchError,
S3SignatureDoesNotMatchError
)
from moto.sts import sts_backend
log = logging.getLogger(__name__)
def create_access_key(access_key_id, headers):
if access_key_id.startswith("AKIA") or "X-Amz-Security-Token" not in headers:
return IAMUserAccessKey(access_key_id, headers)
else:
return AssumedRoleAccessKey(access_key_id, headers)
class IAMUserAccessKey(object):
def __init__(self, access_key_id, headers):
iam_users = iam_backend.list_users('/', None, None)
for iam_user in iam_users:
for access_key in iam_user.access_keys:
if access_key.access_key_id == access_key_id:
self._owner_user_name = iam_user.name
self._access_key_id = access_key_id
self._secret_access_key = access_key.secret_access_key
if "X-Amz-Security-Token" in headers:
raise CreateAccessKeyFailure(reason="InvalidToken")
return
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self):
return "arn:aws:iam::{account_id}:user/{iam_user_name}".format(
account_id=ACCOUNT_ID,
iam_user_name=self._owner_user_name
)
def create_credentials(self):
return Credentials(self._access_key_id, self._secret_access_key)
def collect_policies(self):
user_policies = []
inline_policy_names = iam_backend.list_user_policies(self._owner_user_name)
for inline_policy_name in inline_policy_names:
inline_policy = iam_backend.get_user_policy(self._owner_user_name, inline_policy_name)
user_policies.append(inline_policy)
attached_policies, _ = iam_backend.list_attached_user_policies(self._owner_user_name)
user_policies += attached_policies
user_groups = iam_backend.get_groups_for_user(self._owner_user_name)
for user_group in user_groups:
inline_group_policy_names = iam_backend.list_group_policies(user_group.name)
for inline_group_policy_name in inline_group_policy_names:
inline_user_group_policy = iam_backend.get_group_policy(user_group.name, inline_group_policy_name)
user_policies.append(inline_user_group_policy)
attached_group_policies, _ = iam_backend.list_attached_group_policies(user_group.name)
user_policies += attached_group_policies
return user_policies
class AssumedRoleAccessKey(object):
def __init__(self, access_key_id, headers):
for assumed_role in sts_backend.assumed_roles:
if assumed_role.access_key_id == access_key_id:
self._access_key_id = access_key_id
self._secret_access_key = assumed_role.secret_access_key
self._session_token = assumed_role.session_token
self._owner_role_name = assumed_role.arn.split("/")[-1]
self._session_name = assumed_role.session_name
if headers["X-Amz-Security-Token"] != self._session_token:
raise CreateAccessKeyFailure(reason="InvalidToken")
return
raise CreateAccessKeyFailure(reason="InvalidId")
@property
def arn(self):
return "arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name}".format(
account_id=ACCOUNT_ID,
role_name=self._owner_role_name,
session_name=self._session_name
)
def create_credentials(self):
return Credentials(self._access_key_id, self._secret_access_key, self._session_token)
def collect_policies(self):
role_policies = []
inline_policy_names = iam_backend.list_role_policies(self._owner_role_name)
for inline_policy_name in inline_policy_names:
_, inline_policy = iam_backend.get_role_policy(self._owner_role_name, inline_policy_name)
role_policies.append(inline_policy)
attached_policies, _ = iam_backend.list_attached_role_policies(self._owner_role_name)
role_policies += attached_policies
return role_policies
class CreateAccessKeyFailure(Exception):
def __init__(self, reason, *args):
super(CreateAccessKeyFailure, self).__init__(*args)
self.reason = reason
@six.add_metaclass(ABCMeta)
class IAMRequestBase(object):
def __init__(self, method, path, data, headers):
log.debug("Creating {class_name} with method={method}, path={path}, data={data}, headers={headers}".format(
class_name=self.__class__.__name__, method=method, path=path, data=data, headers=headers))
self._method = method
self._path = path
self._data = data
self._headers = headers
credential_scope = self._get_string_between('Credential=', ',', self._headers['Authorization'])
credential_data = credential_scope.split('/')
self._region = credential_data[2]
self._service = credential_data[3]
self._action = self._service + ":" + (self._data["Action"][0] if isinstance(self._data["Action"], list) else self._data["Action"])
try:
self._access_key = create_access_key(access_key_id=credential_data[0], headers=headers)
except CreateAccessKeyFailure as e:
self._raise_invalid_access_key(e.reason)
def check_signature(self):
original_signature = self._get_string_between('Signature=', ',', self._headers['Authorization'])
calculated_signature = self._calculate_signature()
if original_signature != calculated_signature:
self._raise_signature_does_not_match()
def check_action_permitted(self):
policies = self._access_key.collect_policies()
permitted = False
for policy in policies:
iam_policy = IAMPolicy(policy)
permission_result = iam_policy.is_action_permitted(self._action)
if permission_result == PermissionResult.DENIED:
self._raise_access_denied()
elif permission_result == PermissionResult.PERMITTED:
permitted = True
if not permitted:
self._raise_access_denied()
@abstractmethod
def _raise_signature_does_not_match(self):
raise NotImplementedError()
@abstractmethod
def _raise_access_denied(self):
raise NotImplementedError()
@abstractmethod
def _raise_invalid_access_key(self, reason):
raise NotImplementedError()
@abstractmethod
def _create_auth(self, credentials):
raise NotImplementedError()
@staticmethod
def _create_headers_for_aws_request(signed_headers, original_headers):
headers = {}
for key, value in original_headers.items():
if key.lower() in signed_headers:
headers[key] = value
return headers
def _create_aws_request(self):
signed_headers = self._get_string_between('SignedHeaders=', ',', self._headers['Authorization']).split(';')
headers = self._create_headers_for_aws_request(signed_headers, self._headers)
request = AWSRequest(method=self._method, url=self._path, data=self._data, headers=headers)
request.context['timestamp'] = headers['X-Amz-Date']
return request
def _calculate_signature(self):
credentials = self._access_key.create_credentials()
auth = self._create_auth(credentials)
request = self._create_aws_request()
canonical_request = auth.canonical_request(request)
string_to_sign = auth.string_to_sign(request, canonical_request)
return auth.signature(string_to_sign, request)
@staticmethod
def _get_string_between(first_separator, second_separator, string):
return string.partition(first_separator)[2].partition(second_separator)[0]
class IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self):
if self._service == "ec2":
raise AuthFailureError()
else:
raise SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, _):
if self._service == "ec2":
raise AuthFailureError()
else:
raise InvalidClientTokenIdError()
def _create_auth(self, credentials):
return SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self):
raise AccessDeniedError(
user_arn=self._access_key.arn,
action=self._action
)
class S3IAMRequest(IAMRequestBase):
def _raise_signature_does_not_match(self):
if "BucketName" in self._data:
raise BucketSignatureDoesNotMatchError(bucket=self._data["BucketName"])
else:
raise S3SignatureDoesNotMatchError()
def _raise_invalid_access_key(self, reason):
if reason == "InvalidToken":
if "BucketName" in self._data:
raise BucketInvalidTokenError(bucket=self._data["BucketName"])
else:
raise S3InvalidTokenError()
else:
if "BucketName" in self._data:
raise BucketInvalidAccessKeyIdError(bucket=self._data["BucketName"])
else:
raise S3InvalidAccessKeyIdError()
def _create_auth(self, credentials):
return S3SigV4Auth(credentials, self._service, self._region)
def _raise_access_denied(self):
if "BucketName" in self._data:
raise BucketAccessDeniedError(bucket=self._data["BucketName"])
else:
raise S3AccessDeniedError()
class IAMPolicy(object):
def __init__(self, policy):
if isinstance(policy, Policy):
default_version = next(policy_version for policy_version in policy.versions if policy_version.is_default)
policy_document = default_version.document
elif isinstance(policy, string_types):
policy_document = policy
else:
policy_document = policy["policy_document"]
self._policy_json = json.loads(policy_document)
def is_action_permitted(self, action):
permitted = False
if isinstance(self._policy_json["Statement"], list):
for policy_statement in self._policy_json["Statement"]:
iam_policy_statement = IAMPolicyStatement(policy_statement)
permission_result = iam_policy_statement.is_action_permitted(action)
if permission_result == PermissionResult.DENIED:
return permission_result
elif permission_result == PermissionResult.PERMITTED:
permitted = True
else: # dict
iam_policy_statement = IAMPolicyStatement(self._policy_json["Statement"])
return iam_policy_statement.is_action_permitted(action)
if permitted:
return PermissionResult.PERMITTED
else:
return PermissionResult.NEUTRAL
class IAMPolicyStatement(object):
def __init__(self, statement):
self._statement = statement
def is_action_permitted(self, action):
is_action_concerned = False
if "NotAction" in self._statement:
if not self._check_element_matches("NotAction", action):
is_action_concerned = True
else: # Action is present
if self._check_element_matches("Action", action):
is_action_concerned = True
if is_action_concerned:
if self._statement["Effect"] == "Allow":
return PermissionResult.PERMITTED
else: # Deny
return PermissionResult.DENIED
else:
return PermissionResult.NEUTRAL
def _check_element_matches(self, statement_element, value):
if isinstance(self._statement[statement_element], list):
for statement_element_value in self._statement[statement_element]:
if self._match(statement_element_value, value):
return True
return False
else: # string
return self._match(self._statement[statement_element], value)
@staticmethod
def _match(pattern, string):
pattern = pattern.replace("*", ".*")
pattern = "^{pattern}$".format(pattern=pattern)
return re.match(pattern, string)
class PermissionResult(Enum):
PERMITTED = 1
DENIED = 2
NEUTRAL = 3

View File

@ -65,3 +65,42 @@ class JsonRESTError(RESTError):
def get_body(self, *args, **kwargs):
return self.description
class SignatureDoesNotMatchError(RESTError):
code = 403
def __init__(self):
super(SignatureDoesNotMatchError, self).__init__(
'SignatureDoesNotMatch',
"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.")
class InvalidClientTokenIdError(RESTError):
code = 403
def __init__(self):
super(InvalidClientTokenIdError, self).__init__(
'InvalidClientTokenId',
"The security token included in the request is invalid.")
class AccessDeniedError(RESTError):
code = 403
def __init__(self, user_arn, action):
super(AccessDeniedError, self).__init__(
'AccessDenied',
"User: {user_arn} is not authorized to perform: {operation}".format(
user_arn=user_arn,
operation=action
))
class AuthFailureError(RESTError):
code = 401
def __init__(self):
super(AuthFailureError, self).__init__(
'AuthFailure',
"AWS was not able to validate the provided access credentials")

View File

@ -1,13 +1,17 @@
from __future__ import unicode_literals
import functools
from collections import defaultdict
import datetime
import json
import logging
import re
import io
import requests
import pytz
from moto.core.access_control import IAMRequest, S3IAMRequest
from moto.core.exceptions import DryRunClientError
from jinja2 import Environment, DictLoader, TemplateNotFound
@ -22,7 +26,7 @@ from werkzeug.exceptions import HTTPException
import boto3
from moto.compat import OrderedDict
from moto.core.utils import camelcase_to_underscores, method_names_from_class
from moto import settings
log = logging.getLogger(__name__)
@ -103,7 +107,54 @@ class _TemplateEnvironmentMixin(object):
return self.environment.get_template(template_id)
class BaseResponse(_TemplateEnvironmentMixin):
class ActionAuthenticatorMixin(object):
request_count = 0
def _authenticate_and_authorize_action(self, iam_request_cls):
if ActionAuthenticatorMixin.request_count >= settings.INITIAL_NO_AUTH_ACTION_COUNT:
iam_request = iam_request_cls(method=self.method, path=self.path, data=self.data, headers=self.headers)
iam_request.check_signature()
iam_request.check_action_permitted()
else:
ActionAuthenticatorMixin.request_count += 1
def _authenticate_and_authorize_normal_action(self):
self._authenticate_and_authorize_action(IAMRequest)
def _authenticate_and_authorize_s3_action(self):
self._authenticate_and_authorize_action(S3IAMRequest)
@staticmethod
def set_initial_no_auth_action_count(initial_no_auth_action_count):
def decorator(function):
def wrapper(*args, **kwargs):
if settings.TEST_SERVER_MODE:
response = requests.post("http://localhost:5000/moto-api/reset-auth", data=str(initial_no_auth_action_count).encode())
original_initial_no_auth_action_count = response.json()['PREVIOUS_INITIAL_NO_AUTH_ACTION_COUNT']
else:
original_initial_no_auth_action_count = settings.INITIAL_NO_AUTH_ACTION_COUNT
original_request_count = ActionAuthenticatorMixin.request_count
settings.INITIAL_NO_AUTH_ACTION_COUNT = initial_no_auth_action_count
ActionAuthenticatorMixin.request_count = 0
try:
result = function(*args, **kwargs)
finally:
if settings.TEST_SERVER_MODE:
requests.post("http://localhost:5000/moto-api/reset-auth", data=str(original_initial_no_auth_action_count).encode())
else:
ActionAuthenticatorMixin.request_count = original_request_count
settings.INITIAL_NO_AUTH_ACTION_COUNT = original_initial_no_auth_action_count
return result
functools.update_wrapper(wrapper, function)
wrapper.__wrapped__ = function
return wrapper
return decorator
class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
default_region = 'us-east-1'
# to extract region, use [^.]
@ -167,6 +218,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
self.uri = full_url
self.path = urlparse(full_url).path
self.querystring = querystring
self.data = querystring
self.method = request.method
self.region = self.get_region_from_url(request, full_url)
self.uri_match = None
@ -273,6 +325,13 @@ class BaseResponse(_TemplateEnvironmentMixin):
def call_action(self):
headers = self.response_headers
try:
self._authenticate_and_authorize_normal_action()
except HTTPException as http_error:
response = http_error.description, dict(status=http_error.code)
return self._send_response(headers, response)
action = camelcase_to_underscores(self._get_action())
method_names = method_names_from_class(self.__class__)
if action in method_names:
@ -285,16 +344,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
if isinstance(response, six.string_types):
return 200, headers, response
else:
if len(response) == 2:
body, new_headers = response
else:
status, new_headers, body = response
status = new_headers.get('status', 200)
headers.update(new_headers)
# Cast status to string
if "status" in headers:
headers['status'] = str(headers['status'])
return status, headers, body
return self._send_response(headers, response)
if not action:
return 404, headers, ''
@ -302,6 +352,19 @@ class BaseResponse(_TemplateEnvironmentMixin):
raise NotImplementedError(
"The {0} action has not been implemented".format(action))
@staticmethod
def _send_response(headers, response):
if len(response) == 2:
body, new_headers = response
else:
status, new_headers, body = response
status = new_headers.get('status', 200)
headers.update(new_headers)
# Cast status to string
if "status" in headers:
headers['status'] = str(headers['status'])
return status, headers, body
def _get_param(self, param_name, if_none=None):
val = self.querystring.get(param_name)
if val is not None:
@ -569,6 +632,14 @@ class MotoAPIResponse(BaseResponse):
return 200, {}, json.dumps({"status": "ok"})
return 400, {}, json.dumps({"Error": "Need to POST to reset Moto"})
def reset_auth_response(self, request, full_url, headers):
if request.method == "POST":
previous_initial_no_auth_action_count = settings.INITIAL_NO_AUTH_ACTION_COUNT
settings.INITIAL_NO_AUTH_ACTION_COUNT = float(request.data.decode())
ActionAuthenticatorMixin.request_count = 0
return 200, {}, json.dumps({"status": "ok", "PREVIOUS_INITIAL_NO_AUTH_ACTION_COUNT": str(previous_initial_no_auth_action_count)})
return 400, {}, json.dumps({"Error": "Need to POST to reset Moto Auth"})
def model_data(self, request, full_url, headers):
from moto.core.models import model_data

View File

@ -11,4 +11,5 @@ url_paths = {
'{0}/moto-api/$': response_instance.dashboard,
'{0}/moto-api/data.json': response_instance.model_data,
'{0}/moto-api/reset': response_instance.reset_response,
'{0}/moto-api/reset-auth': response_instance.reset_auth_response,
}

View File

@ -1248,5 +1248,13 @@ class IAMBackend(BaseBackend):
return saml_provider
raise IAMNotFoundException("SamlProvider {0} not found".format(saml_provider_arn))
def get_user_from_access_key_id(self, access_key_id):
for user_name, user in self.users.items():
access_keys = self.get_all_access_keys(user_name)
for access_key in access_keys:
if access_key.access_key_id == access_key_id:
return user
return None
iam_backend = IAMBackend()

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import iam_backend, User
@ -425,11 +426,13 @@ class IamResponse(BaseResponse):
def get_user(self):
user_name = self._get_param('UserName')
if user_name:
user = iam_backend.get_user(user_name)
if not user_name:
access_key_id = self.get_current_user()
user = iam_backend.get_user_from_access_key_id(access_key_id)
if user is None:
user = User("default_user")
else:
user = User(name='default_user')
# If no user is specific, IAM returns the current user
user = iam_backend.get_user(user_name)
template = self.response_template(USER_TEMPLATE)
return template.render(action='Get', user=user)
@ -457,7 +460,6 @@ class IamResponse(BaseResponse):
def create_login_profile(self):
user_name = self._get_param('UserName')
password = self._get_param('Password')
password = self._get_param('Password')
user = iam_backend.create_login_profile(user_name, password)
template = self.response_template(CREATE_LOGIN_PROFILE_TEMPLATE)

View File

@ -199,3 +199,67 @@ class DuplicateTagKeys(S3ClientError):
"InvalidTag",
"Cannot provide multiple Tags with the same key",
*args, **kwargs)
class S3AccessDeniedError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
super(S3AccessDeniedError, self).__init__('AccessDenied', 'Access Denied', *args, **kwargs)
class BucketAccessDeniedError(BucketError):
code = 403
def __init__(self, *args, **kwargs):
super(BucketAccessDeniedError, self).__init__('AccessDenied', 'Access Denied', *args, **kwargs)
class S3InvalidTokenError(S3ClientError):
code = 400
def __init__(self, *args, **kwargs):
super(S3InvalidTokenError, self).__init__('InvalidToken', 'The provided token is malformed or otherwise invalid.', *args, **kwargs)
class BucketInvalidTokenError(BucketError):
code = 400
def __init__(self, *args, **kwargs):
super(BucketInvalidTokenError, self).__init__('InvalidToken', 'The provided token is malformed or otherwise invalid.', *args, **kwargs)
class S3InvalidAccessKeyIdError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
super(S3InvalidAccessKeyIdError, self).__init__(
'InvalidAccessKeyId',
"The AWS Access Key Id you provided does not exist in our records.", *args, **kwargs)
class BucketInvalidAccessKeyIdError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
super(BucketInvalidAccessKeyIdError, self).__init__(
'InvalidAccessKeyId',
"The AWS Access Key Id you provided does not exist in our records.", *args, **kwargs)
class S3SignatureDoesNotMatchError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
super(S3SignatureDoesNotMatchError, self).__init__(
'SignatureDoesNotMatch',
"The request signature we calculated does not match the signature you provided. Check your key and signing method.", *args, **kwargs)
class BucketSignatureDoesNotMatchError(S3ClientError):
code = 403
def __init__(self, *args, **kwargs):
super(BucketSignatureDoesNotMatchError, self).__init__(
'SignatureDoesNotMatch',
"The request signature we calculated does not match the signature you provided. Check your key and signing method.", *args, **kwargs)

View File

@ -3,13 +3,14 @@ from __future__ import unicode_literals
import re
import six
from moto.core.utils import str_to_rfc_1123_datetime
from six.moves.urllib.parse import parse_qs, urlparse, unquote
import xmltodict
from moto.packages.httpretty.core import HTTPrettyRequest
from moto.core.responses import _TemplateEnvironmentMixin
from moto.core.responses import _TemplateEnvironmentMixin, ActionAuthenticatorMixin
from moto.core.utils import path_url
from moto.s3bucket_path.utils import bucket_name_from_url as bucketpath_bucket_name_from_url, \
@ -25,6 +26,72 @@ from xml.dom import minidom
DEFAULT_REGION_NAME = 'us-east-1'
ACTION_MAP = {
"BUCKET": {
"GET": {
"uploads": "ListBucketMultipartUploads",
"location": "GetBucketLocation",
"lifecycle": "GetLifecycleConfiguration",
"versioning": "GetBucketVersioning",
"policy": "GetBucketPolicy",
"website": "GetBucketWebsite",
"acl": "GetBucketAcl",
"tagging": "GetBucketTagging",
"logging": "GetBucketLogging",
"cors": "GetBucketCORS",
"notification": "GetBucketNotification",
"accelerate": "GetAccelerateConfiguration",
"versions": "ListBucketVersions",
"DEFAULT": "ListBucket"
},
"PUT": {
"lifecycle": "PutLifecycleConfiguration",
"versioning": "PutBucketVersioning",
"policy": "PutBucketPolicy",
"website": "PutBucketWebsite",
"acl": "PutBucketAcl",
"tagging": "PutBucketTagging",
"logging": "PutBucketLogging",
"cors": "PutBucketCORS",
"notification": "PutBucketNotification",
"accelerate": "PutAccelerateConfiguration",
"DEFAULT": "CreateBucket"
},
"DELETE": {
"lifecycle": "PutLifecycleConfiguration",
"policy": "DeleteBucketPolicy",
"tagging": "PutBucketTagging",
"cors": "PutBucketCORS",
"DEFAULT": "DeleteBucket"
}
},
"KEY": {
"GET": {
"uploadId": "ListMultipartUploadParts",
"acl": "GetObjectAcl",
"tagging": "GetObjectTagging",
"versionId": "GetObjectVersion",
"DEFAULT": "GetObject"
},
"PUT": {
"acl": "PutObjectAcl",
"tagging": "PutObjectTagging",
"DEFAULT": "PutObject"
},
"DELETE": {
"uploadId": "AbortMultipartUpload",
"versionId": "DeleteObjectVersion",
"DEFAULT": " DeleteObject"
},
"POST": {
"uploads": "PutObject",
"restore": "RestoreObject",
"uploadId": "PutObject"
}
}
}
def parse_key_name(pth):
return pth.lstrip("/")
@ -37,17 +104,24 @@ def is_delete_keys(request, path, bucket_name):
)
class ResponseObject(_TemplateEnvironmentMixin):
class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
def __init__(self, backend):
super(ResponseObject, self).__init__()
self.backend = backend
self.method = ""
self.path = ""
self.data = {}
self.headers = {}
@property
def should_autoescape(self):
return True
def all_buckets(self):
self.data["Action"] = "ListAllMyBuckets"
self._authenticate_and_authorize_s3_action()
# No bucket specified. Listing all buckets
all_buckets = self.backend.get_all_buckets()
template = self.response_template(S3_ALL_BUCKETS)
@ -112,11 +186,20 @@ class ResponseObject(_TemplateEnvironmentMixin):
return self.bucket_response(request, full_url, headers)
def bucket_response(self, request, full_url, headers):
self.method = request.method
self.path = self._get_path(request)
self.headers = request.headers
if 'host' not in self.headers:
self.headers['host'] = urlparse(full_url).netloc
try:
response = self._bucket_response(request, full_url, headers)
except S3ClientError as s3error:
response = s3error.code, {}, s3error.description
return self._send_response(response)
@staticmethod
def _send_response(response):
if isinstance(response, six.string_types):
return 200, {}, response.encode("utf-8")
else:
@ -127,8 +210,7 @@ class ResponseObject(_TemplateEnvironmentMixin):
return status_code, headers, response_content
def _bucket_response(self, request, full_url, headers):
parsed_url = urlparse(full_url)
querystring = parse_qs(parsed_url.query, keep_blank_values=True)
querystring = self._get_querystring(full_url)
method = request.method
region_name = parse_region_from_url(full_url)
@ -137,6 +219,8 @@ class ResponseObject(_TemplateEnvironmentMixin):
# If no bucket specified, list all buckets
return self.all_buckets()
self.data["BucketName"] = bucket_name
if hasattr(request, 'body'):
# Boto
body = request.body
@ -150,20 +234,26 @@ class ResponseObject(_TemplateEnvironmentMixin):
body = u'{0}'.format(body).encode('utf-8')
if method == 'HEAD':
return self._bucket_response_head(bucket_name, headers)
return self._bucket_response_head(bucket_name)
elif method == 'GET':
return self._bucket_response_get(bucket_name, querystring, headers)
return self._bucket_response_get(bucket_name, querystring)
elif method == 'PUT':
return self._bucket_response_put(request, body, region_name, bucket_name, querystring, headers)
return self._bucket_response_put(request, body, region_name, bucket_name, querystring)
elif method == 'DELETE':
return self._bucket_response_delete(body, bucket_name, querystring, headers)
return self._bucket_response_delete(body, bucket_name, querystring)
elif method == 'POST':
return self._bucket_response_post(request, body, bucket_name, headers)
return self._bucket_response_post(request, body, bucket_name)
else:
raise NotImplementedError(
"Method {0} has not been impelemented in the S3 backend yet".format(method))
def _bucket_response_head(self, bucket_name, headers):
@staticmethod
def _get_querystring(full_url):
parsed_url = urlparse(full_url)
querystring = parse_qs(parsed_url.query, keep_blank_values=True)
return querystring
def _bucket_response_head(self, bucket_name):
try:
self.backend.get_bucket(bucket_name)
except MissingBucket:
@ -174,7 +264,10 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 404, {}, ""
return 200, {}, ""
def _bucket_response_get(self, bucket_name, querystring, headers):
def _bucket_response_get(self, bucket_name, querystring):
self._set_action("BUCKET", "GET", querystring)
self._authenticate_and_authorize_s3_action()
if 'uploads' in querystring:
for unsup in ('delimiter', 'max-uploads'):
if unsup in querystring:
@ -333,6 +426,15 @@ class ResponseObject(_TemplateEnvironmentMixin):
max_keys=max_keys
)
def _set_action(self, action_resource_type, method, querystring):
action_set = False
for action_in_querystring, action in ACTION_MAP[action_resource_type][method].items():
if action_in_querystring in querystring:
self.data["Action"] = action
action_set = True
if not action_set:
self.data["Action"] = ACTION_MAP[action_resource_type][method]["DEFAULT"]
def _handle_list_objects_v2(self, bucket_name, querystring):
template = self.response_template(S3_BUCKET_GET_RESPONSE_V2)
bucket = self.backend.get_bucket(bucket_name)
@ -393,9 +495,13 @@ class ResponseObject(_TemplateEnvironmentMixin):
next_continuation_token = None
return result_keys, is_truncated, next_continuation_token
def _bucket_response_put(self, request, body, region_name, bucket_name, querystring, headers):
def _bucket_response_put(self, request, body, region_name, bucket_name, querystring):
if not request.headers.get('Content-Length'):
return 411, {}, "Content-Length required"
self._set_action("BUCKET", "PUT", querystring)
self._authenticate_and_authorize_s3_action()
if 'versioning' in querystring:
ver = re.search('<Status>([A-Za-z]+)</Status>', body.decode())
if ver:
@ -494,7 +600,10 @@ class ResponseObject(_TemplateEnvironmentMixin):
template = self.response_template(S3_BUCKET_CREATE_RESPONSE)
return 200, {}, template.render(bucket=new_bucket)
def _bucket_response_delete(self, body, bucket_name, querystring, headers):
def _bucket_response_delete(self, body, bucket_name, querystring):
self._set_action("BUCKET", "DELETE", querystring)
self._authenticate_and_authorize_s3_action()
if 'policy' in querystring:
self.backend.delete_bucket_policy(bucket_name, body)
return 204, {}, ""
@ -521,17 +630,20 @@ class ResponseObject(_TemplateEnvironmentMixin):
S3_DELETE_BUCKET_WITH_ITEMS_ERROR)
return 409, {}, template.render(bucket=removed_bucket)
def _bucket_response_post(self, request, body, bucket_name, headers):
def _bucket_response_post(self, request, body, bucket_name):
if not request.headers.get('Content-Length'):
return 411, {}, "Content-Length required"
if isinstance(request, HTTPrettyRequest):
path = request.path
else:
path = request.full_path if hasattr(request, 'full_path') else path_url(request.url)
path = self._get_path(request)
if self.is_delete_keys(request, path, bucket_name):
return self._bucket_response_delete_keys(request, body, bucket_name, headers)
self.data["Action"] = "DeleteObject"
self._authenticate_and_authorize_s3_action()
return self._bucket_response_delete_keys(request, body, bucket_name)
self.data["Action"] = "PutObject"
self._authenticate_and_authorize_s3_action()
# POST to bucket-url should create file from form
if hasattr(request, 'form'):
@ -560,7 +672,15 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 200, {}, ""
def _bucket_response_delete_keys(self, request, body, bucket_name, headers):
@staticmethod
def _get_path(request):
if isinstance(request, HTTPrettyRequest):
path = request.path
else:
path = request.full_path if hasattr(request, 'full_path') else path_url(request.url)
return path
def _bucket_response_delete_keys(self, request, body, bucket_name):
template = self.response_template(S3_DELETE_KEYS_RESPONSE)
keys = minidom.parseString(body).getElementsByTagName('Key')
@ -606,6 +726,11 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 206, response_headers, response_content[begin:end + 1]
def key_response(self, request, full_url, headers):
self.method = request.method
self.path = self._get_path(request)
self.headers = request.headers
if 'host' not in self.headers:
self.headers['host'] = urlparse(full_url).netloc
response_headers = {}
try:
response = self._key_response(request, full_url, headers)
@ -665,14 +790,17 @@ class ResponseObject(_TemplateEnvironmentMixin):
elif method == 'HEAD':
return self._key_response_head(bucket_name, query, key_name, headers=request.headers)
elif method == 'DELETE':
return self._key_response_delete(bucket_name, query, key_name, headers)
return self._key_response_delete(bucket_name, query, key_name)
elif method == 'POST':
return self._key_response_post(request, body, bucket_name, query, key_name, headers)
return self._key_response_post(request, body, bucket_name, query, key_name)
else:
raise NotImplementedError(
"Method {0} has not been implemented in the S3 backend yet".format(method))
def _key_response_get(self, bucket_name, query, key_name, headers):
self._set_action("KEY", "GET", query)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if query.get('uploadId'):
upload_id = query['uploadId'][0]
@ -707,6 +835,9 @@ class ResponseObject(_TemplateEnvironmentMixin):
return 200, response_headers, key.value
def _key_response_put(self, request, body, bucket_name, query, key_name, headers):
self._set_action("KEY", "PUT", query)
self._authenticate_and_authorize_s3_action()
response_headers = {}
if query.get('uploadId') and query.get('partNumber'):
upload_id = query['uploadId'][0]
@ -1080,7 +1211,10 @@ class ResponseObject(_TemplateEnvironmentMixin):
config = parsed_xml['AccelerateConfiguration']
return config['Status']
def _key_response_delete(self, bucket_name, query, key_name, headers):
def _key_response_delete(self, bucket_name, query, key_name):
self._set_action("KEY", "DELETE", query)
self._authenticate_and_authorize_s3_action()
if query.get('uploadId'):
upload_id = query['uploadId'][0]
self.backend.cancel_multipart(bucket_name, upload_id)
@ -1100,7 +1234,10 @@ class ResponseObject(_TemplateEnvironmentMixin):
raise InvalidPartOrder()
yield (pn, p.getElementsByTagName('ETag')[0].firstChild.wholeText)
def _key_response_post(self, request, body, bucket_name, query, key_name, headers):
def _key_response_post(self, request, body, bucket_name, query, key_name):
self._set_action("KEY", "POST", query)
self._authenticate_and_authorize_s3_action()
if body == b'' and 'uploads' in query:
metadata = metadata_from_headers(request.headers)
multipart = self.backend.initiate_multipart(

View File

@ -7,15 +7,6 @@ url_bases = [
r"https?://(?P<bucket_name>[a-zA-Z0-9\-_.]*)\.?s3(.*).amazonaws.com"
]
def ambiguous_response1(*args, **kwargs):
return S3ResponseInstance.ambiguous_response(*args, **kwargs)
def ambiguous_response2(*args, **kwargs):
return S3ResponseInstance.ambiguous_response(*args, **kwargs)
url_paths = {
# subdomain bucket
'{0}/$': S3ResponseInstance.bucket_response,

View File

@ -1,3 +1,4 @@
import os
TEST_SERVER_MODE = os.environ.get('TEST_SERVER_MODE', '0').lower() == 'true'
INITIAL_NO_AUTH_ACTION_COUNT = float(os.environ.get('INITIAL_NO_AUTH_ACTION_COUNT', float('inf')))

View File

@ -38,6 +38,9 @@ class AssumedRole(BaseModel):
class STSBackend(BaseBackend):
def __init__(self):
self.assumed_roles = []
def get_session_token(self, duration):
token = Token(duration=duration)
return token
@ -48,6 +51,7 @@ class STSBackend(BaseBackend):
def assume_role(self, **kwargs):
role = AssumedRole(**kwargs)
self.assumed_roles.append(role)
return role
def assume_role_with_web_identity(self, **kwargs):

View File

@ -0,0 +1,685 @@
import json
import boto3
import sure # noqa
from botocore.exceptions import ClientError
# Ensure 'assert_raises' context manager support for Python 2.6
import tests.backport_assert_raises
from nose.tools import assert_raises
from moto import mock_iam, mock_ec2, mock_s3, mock_sts, mock_elbv2, mock_rds2
from moto.core import set_initial_no_auth_action_count
from moto.iam.models import ACCOUNT_ID
@mock_iam
def create_user_with_access_key(user_name='test-user'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
return client.create_access_key(UserName=user_name)['AccessKey']
@mock_iam
def create_user_with_access_key_and_inline_policy(user_name, policy_document, policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
client.put_user_policy(UserName=user_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document))
return client.create_access_key(UserName=user_name)['AccessKey']
@mock_iam
def create_user_with_access_key_and_attached_policy(user_name, policy_document, policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
policy_arn = client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)['Policy']['Arn']
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
return client.create_access_key(UserName=user_name)['AccessKey']
@mock_iam
def create_user_with_access_key_and_multiple_policies(user_name, inline_policy_document,
attached_policy_document, inline_policy_name='policy1', attached_policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_user(UserName=user_name)
policy_arn = client.create_policy(
PolicyName=attached_policy_name,
PolicyDocument=json.dumps(attached_policy_document)
)['Policy']['Arn']
client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
client.put_user_policy(UserName=user_name, PolicyName=inline_policy_name, PolicyDocument=json.dumps(inline_policy_document))
return client.create_access_key(UserName=user_name)['AccessKey']
def create_group_with_attached_policy_and_add_user(user_name, policy_document,
group_name='test-group', policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_group(GroupName=group_name)
policy_arn = client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)['Policy']['Arn']
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
def create_group_with_inline_policy_and_add_user(user_name, policy_document,
group_name='test-group', policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_group(GroupName=group_name)
client.put_group_policy(
GroupName=group_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
def create_group_with_multiple_policies_and_add_user(user_name, inline_policy_document,
attached_policy_document, group_name='test-group',
inline_policy_name='policy1', attached_policy_name='policy1'):
client = boto3.client('iam', region_name='us-east-1')
client.create_group(GroupName=group_name)
client.put_group_policy(
GroupName=group_name,
PolicyName=inline_policy_name,
PolicyDocument=json.dumps(inline_policy_document)
)
policy_arn = client.create_policy(
PolicyName=attached_policy_name,
PolicyDocument=json.dumps(attached_policy_document)
)['Policy']['Arn']
client.attach_group_policy(GroupName=group_name, PolicyArn=policy_arn)
client.add_user_to_group(GroupName=group_name, UserName=user_name)
@mock_iam
@mock_sts
def create_role_with_attached_policy_and_assume_it(role_name, trust_policy_document,
policy_document, session_name='session1', policy_name='policy1'):
iam_client = boto3.client('iam', region_name='us-east-1')
sts_client = boto3.client('sts', region_name='us-east-1')
role_arn = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)['Role']['Arn']
policy_arn = iam_client.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)['Policy']['Arn']
iam_client.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)['Credentials']
@mock_iam
@mock_sts
def create_role_with_inline_policy_and_assume_it(role_name, trust_policy_document,
policy_document, session_name='session1', policy_name='policy1'):
iam_client = boto3.client('iam', region_name='us-east-1')
sts_client = boto3.client('sts', region_name='us-east-1')
role_arn = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy_document)
)['Role']['Arn']
iam_client.put_role_policy(
RoleName=role_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(policy_document)
)
return sts_client.assume_role(RoleArn=role_arn, RoleSessionName=session_name)['Credentials']
@set_initial_no_auth_action_count(0)
@mock_iam
def test_invalid_client_token_id():
client = boto3.client('iam', region_name='us-east-1', aws_access_key_id='invalid', aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.get_user()
ex.exception.response['Error']['Code'].should.equal('InvalidClientTokenId')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The security token included in the request is invalid.')
@set_initial_no_auth_action_count(0)
@mock_ec2
def test_auth_failure():
client = boto3.client('ec2', region_name='us-east-1', aws_access_key_id='invalid', aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.describe_instances()
ex.exception.response['Error']['Code'].should.equal('AuthFailure')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(401)
ex.exception.response['Error']['Message'].should.equal('AWS was not able to validate the provided access credentials')
@set_initial_no_auth_action_count(2)
@mock_iam
def test_signature_does_not_match():
access_key = create_user_with_access_key()
client = boto3.client('iam', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.get_user()
ex.exception.response['Error']['Code'].should.equal('SignatureDoesNotMatch')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.')
@set_initial_no_auth_action_count(2)
@mock_ec2
def test_auth_failure_with_valid_access_key_id():
access_key = create_user_with_access_key()
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.describe_instances()
ex.exception.response['Error']['Code'].should.equal('AuthFailure')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(401)
ex.exception.response['Error']['Message'].should.equal('AWS was not able to validate the provided access credentials')
@set_initial_no_auth_action_count(2)
@mock_ec2
def test_access_denied_with_no_policy():
user_name = 'test-user'
access_key = create_user_with_access_key(user_name)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.describe_instances()
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:DescribeInstances"
)
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_access_denied_with_not_allowing_policy():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:Describe*"
],
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.run_instances(MaxCount=1, MinCount=1)
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:RunInstances"
)
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_access_denied_with_denying_policy():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ec2:*",
],
"Resource": "*"
},
{
"Effect": "Deny",
"Action": "ec2:CreateVpc",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.create_vpc(CidrBlock="10.0.0.0/16")
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:CreateVpc"
)
)
@set_initial_no_auth_action_count(3)
@mock_ec2
def test_allowed_with_wildcard_action():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:Describe*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.describe_tags()['Tags'].should.be.empty
@set_initial_no_auth_action_count(4)
@mock_iam
def test_allowed_with_explicit_action_in_attached_policy():
user_name = 'test-user'
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:ListGroups",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_attached_policy(user_name, attached_policy_document)
client = boto3.client('iam', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.list_groups()['Groups'].should.be.empty
@set_initial_no_auth_action_count(8)
@mock_s3
@mock_iam
def test_s3_access_denied_with_denying_attached_group_policy():
user_name = 'test-user'
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:ListAllMyBuckets",
"Resource": "*"
}
]
}
group_attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "s3:List*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_attached_policy(user_name, attached_policy_document)
create_group_with_attached_policy_and_add_user(user_name, group_attached_policy_document)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.list_buckets()
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('Access Denied')
@set_initial_no_auth_action_count(6)
@mock_s3
@mock_iam
def test_s3_access_denied_with_denying_inline_group_policy():
user_name = 'test-user'
bucket_name = 'test-bucket'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Resource": "*"
}
]
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "s3:GetObject",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
create_group_with_inline_policy_and_add_user(user_name, group_inline_policy_document)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.get_object(Bucket=bucket_name, Key='sdfsdf')
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('Access Denied')
@set_initial_no_auth_action_count(10)
@mock_iam
@mock_ec2
def test_access_denied_with_many_irrelevant_policies():
user_name = 'test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:Describe*",
"Resource": "*"
}
]
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "*"
}
]
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "iam:List*",
"Resource": "*"
}
]
}
group_attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"Action": "lambda:*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_multiple_policies(user_name, inline_policy_document,
attached_policy_document)
create_group_with_multiple_policies_and_add_user(user_name, group_inline_policy_document,
group_attached_policy_document)
client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
with assert_raises(ClientError) as ex:
client.create_key_pair(KeyName="TestKey")
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:iam::{account_id}:user/{user_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
user_name=user_name,
operation="ec2:CreateKeyPair"
)
)
@set_initial_no_auth_action_count(4)
@mock_iam
@mock_sts
@mock_ec2
@mock_elbv2
def test_allowed_with_temporary_credentials():
role_name = 'test-role'
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
"Action": "sts:AssumeRole"
}
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"elasticloadbalancing:CreateLoadBalancer",
"ec2:DescribeSubnets"
],
"Resource": "*"
}
]
}
credentials = create_role_with_attached_policy_and_assume_it(role_name, trust_policy_document, attached_policy_document)
elbv2_client = boto3.client('elbv2', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
ec2_client = boto3.client('ec2', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
subnets = ec2_client.describe_subnets()['Subnets']
len(subnets).should.be.greater_than(1)
elbv2_client.create_load_balancer(
Name='test-load-balancer',
Subnets=[
subnets[0]['SubnetId'],
subnets[1]['SubnetId']
]
)['LoadBalancers'].should.have.length_of(1)
@set_initial_no_auth_action_count(3)
@mock_iam
@mock_sts
@mock_rds2
def test_access_denied_with_temporary_credentials():
role_name = 'test-role'
session_name = 'test-session'
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
"Action": "sts:AssumeRole"
}
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
'rds:Describe*'
],
"Resource": "*"
}
]
}
credentials = create_role_with_inline_policy_and_assume_it(role_name, trust_policy_document,
attached_policy_document, session_name)
client = boto3.client('rds', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'])
with assert_raises(ClientError) as ex:
client.create_db_instance(
DBInstanceIdentifier='test-db-instance',
DBInstanceClass='db.t3',
Engine='aurora-postgresql'
)
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal(
'User: arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name} is not authorized to perform: {operation}'.format(
account_id=ACCOUNT_ID,
role_name=role_name,
session_name=session_name,
operation="rds:CreateDBInstance"
)
)
@set_initial_no_auth_action_count(3)
@mock_iam
def test_get_user_from_credentials():
user_name = 'new-test-user'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iam:*",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
client = boto3.client('iam', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.get_user()['User']['UserName'].should.equal(user_name)
@set_initial_no_auth_action_count(0)
@mock_s3
def test_s3_invalid_access_key_id():
client = boto3.client('s3', region_name='us-east-1', aws_access_key_id='invalid', aws_secret_access_key='invalid')
with assert_raises(ClientError) as ex:
client.list_buckets()
ex.exception.response['Error']['Code'].should.equal('InvalidAccessKeyId')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The AWS Access Key Id you provided does not exist in our records.')
@set_initial_no_auth_action_count(3)
@mock_s3
@mock_iam
def test_s3_signature_does_not_match():
bucket_name = 'test-bucket'
access_key = create_user_with_access_key()
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key='invalid')
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.put_object(Bucket=bucket_name, Key="abc")
ex.exception.response['Error']['Code'].should.equal('SignatureDoesNotMatch')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('The request signature we calculated does not match the signature you provided. Check your key and signing method.')
@set_initial_no_auth_action_count(7)
@mock_s3
@mock_iam
def test_s3_access_denied_not_action():
user_name = 'test-user'
bucket_name = 'test-bucket'
inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Resource": "*"
}
]
}
group_inline_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Deny",
"NotAction": "iam:GetUser",
"Resource": "*"
}
]
}
access_key = create_user_with_access_key_and_inline_policy(user_name, inline_policy_document)
create_group_with_inline_policy_and_add_user(user_name, group_inline_policy_document)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=access_key['AccessKeyId'],
aws_secret_access_key=access_key['SecretAccessKey'])
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.delete_object(Bucket=bucket_name, Key='sdfsdf')
ex.exception.response['Error']['Code'].should.equal('AccessDenied')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(403)
ex.exception.response['Error']['Message'].should.equal('Access Denied')
@set_initial_no_auth_action_count(4)
@mock_iam
@mock_sts
@mock_s3
def test_s3_invalid_token_with_temporary_credentials():
role_name = 'test-role'
session_name = 'test-session'
bucket_name = 'test-bucket-888'
trust_policy_document = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::{account_id}:root".format(account_id=ACCOUNT_ID)},
"Action": "sts:AssumeRole"
}
}
attached_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
'*'
],
"Resource": "*"
}
]
}
credentials = create_role_with_inline_policy_and_assume_it(role_name, trust_policy_document,
attached_policy_document, session_name)
client = boto3.client('s3', region_name='us-east-1',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token='invalid')
client.create_bucket(Bucket=bucket_name)
with assert_raises(ClientError) as ex:
client.list_bucket_metrics_configurations(Bucket=bucket_name)
ex.exception.response['Error']['Code'].should.equal('InvalidToken')
ex.exception.response['ResponseMetadata']['HTTPStatusCode'].should.equal(400)
ex.exception.response['Error']['Message'].should.equal('The provided token is malformed or otherwise invalid.')