Implemented authentication for services except for S3.

This commit is contained in:
acsbendi 2019-07-02 17:40:08 +02:00
parent 0b51dd47f3
commit fbd0749854
4 changed files with 266 additions and 18 deletions

189
moto/core/authentication.py Normal file
View File

@ -0,0 +1,189 @@
import json
from enum import Enum
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials
from moto.iam.models import ACCOUNT_ID, Policy
from moto.iam import iam_backend
from moto.core.exceptions import SignatureDoesNotMatchError, AccessDeniedError, InvalidClientTokenIdError
ACCESS_KEY_STORE = {
"AKIAJDULPKHCC4KGTYVA": {
"owner": "avatao-user",
"secret_access_key": "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"
}
}
class IAMRequest:
def __init__(self, method, path, data, headers):
print(f"Creating {IAMRequest.__name__} with method={method}, path={path}, data={data}, headers={headers}")
self._method = method
self._path = path
self._data = data
self._headers = headers
credential_scope = self._get_string_between('Credential=', ',', self._headers['Authorization'])
credential_data = credential_scope.split('/')
self._access_key = credential_data[0]
self._region = credential_data[2]
self._service = credential_data[3]
self._action = self._service + ":" + self._data["Action"][0]
def check_signature(self):
original_signature = self._get_string_between('Signature=', ',', self._headers['Authorization'])
calculated_signature = self._calculate_signature()
if original_signature != calculated_signature:
raise SignatureDoesNotMatchError()
def check_action_permitted(self):
iam_user_name = ACCESS_KEY_STORE[self._access_key]["owner"]
user_policies = self._collect_policies_for_iam_user(iam_user_name)
permitted = False
for policy in user_policies:
iam_policy = IAMPolicy(policy)
permission_result = iam_policy.is_action_permitted(self._action)
if permission_result == PermissionResult.DENIED:
self._raise_access_denied(iam_user_name)
elif permission_result == PermissionResult.PERMITTED:
permitted = True
if not permitted:
self._raise_access_denied(iam_user_name)
def _raise_access_denied(self, iam_user_name):
raise AccessDeniedError(
account_id=ACCOUNT_ID,
iam_user_name=iam_user_name,
action=self._action
)
@staticmethod
def _collect_policies_for_iam_user(iam_user_name):
user_policies = []
inline_policy_names = iam_backend.list_user_policies(iam_user_name)
for inline_policy_name in inline_policy_names:
inline_policy = iam_backend.get_user_policy(iam_user_name, inline_policy_name)
user_policies.append(inline_policy)
attached_policies, _ = iam_backend.list_attached_user_policies(iam_user_name)
user_policies += attached_policies
user_groups = iam_backend.get_groups_for_user(iam_user_name)
for user_group in user_groups:
inline_group_policy_names = iam_backend.list_group_policies(user_group)
for inline_group_policy_name in inline_group_policy_names:
inline_user_group_policy = iam_backend.get_group_policy(user_group.name, inline_group_policy_name)
user_policies.append(inline_user_group_policy)
attached_group_policies = iam_backend.list_attached_group_policies(user_group.name)
user_policies += attached_group_policies
return user_policies
def _create_auth(self):
if self._access_key not in ACCESS_KEY_STORE:
raise InvalidClientTokenIdError()
secret_key = ACCESS_KEY_STORE[self._access_key]["secret_access_key"]
credentials = Credentials(self._access_key, secret_key)
return SigV4Auth(credentials, self._service, self._region)
@staticmethod
def _create_headers_for_aws_request(signed_headers, original_headers):
headers = {}
for key, value in original_headers.items():
if key.lower() in signed_headers:
headers[key] = value
return headers
def _create_aws_request(self):
signed_headers = self._get_string_between('SignedHeaders=', ',', self._headers['Authorization']).split(';')
headers = self._create_headers_for_aws_request(signed_headers, self._headers)
request = AWSRequest(method=self._method, url=self._path, data=self._data, headers=headers)
request.context['timestamp'] = headers['X-Amz-Date']
return request
def _calculate_signature(self):
auth = self._create_auth()
request = self._create_aws_request()
canonical_request = auth.canonical_request(request)
string_to_sign = auth.string_to_sign(request, canonical_request)
return auth.signature(string_to_sign, request)
@staticmethod
def _get_string_between(first_separator, second_separator, string):
return string.partition(first_separator)[2].partition(second_separator)[0]
class IAMPolicy:
def __init__(self, policy):
self._policy = policy
def is_action_permitted(self, action):
if isinstance(self._policy, Policy):
default_version = next(policy_version for policy_version in self._policy.versions if policy_version.is_default)
policy_document = default_version.document
else:
policy_document = self._policy["policy_document"]
policy_json = json.loads(policy_document)
permitted = False
for policy_statement in policy_json["Statement"]:
iam_policy_statement = IAMPolicyStatement(policy_statement)
permission_result = iam_policy_statement.is_action_permitted(action)
if permission_result == PermissionResult.DENIED:
return permission_result
elif permission_result == PermissionResult.PERMITTED:
permitted = True
if permitted:
return PermissionResult.PERMITTED
else:
return PermissionResult.NEUTRAL
class IAMPolicyStatement:
def __init__(self, statement):
self._statement = statement
def is_action_permitted(self, action):
is_action_concerned = False
if "NotAction" in self._statement:
if not self._check_element_contains("NotAction", action):
is_action_concerned = True
else: # Action is present
if self._check_element_contains("Action", action):
is_action_concerned = True
# TODO: check Resource/NotResource and Condition
if is_action_concerned:
if self._statement["Effect"] == "Allow":
return PermissionResult.PERMITTED
else: # Deny
return PermissionResult.DENIED
else:
return PermissionResult.NEUTRAL
def _check_element_contains(self, statement_element, value):
if isinstance(self._statement[statement_element], list):
return value in self._statement[statement_element]
else: # string
return value == self._statement[statement_element]
class PermissionResult(Enum):
PERMITTED = 1
DENIED = 2
NEUTRAL = 3

View File

@ -65,3 +65,34 @@ class JsonRESTError(RESTError):
def get_body(self, *args, **kwargs):
return self.description
class SignatureDoesNotMatchError(RESTError):
code = 400
def __init__(self):
super(SignatureDoesNotMatchError, self).__init__(
'SignatureDoesNotMatch',
"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.")
class InvalidClientTokenIdError(RESTError):
code = 400
def __init__(self):
super(InvalidClientTokenIdError, self).__init__(
'InvalidClientTokenId',
"The security token included in the request is invalid.")
class AccessDeniedError(RESTError):
code = 403
def __init__(self, account_id, iam_user_name, action):
super(AccessDeniedError, self).__init__(
'AccessDenied',
"User: arn:aws:iam::{account_id}:user/{iam_user_name} is not authorized to perform: {operation}".format(
account_id=account_id,
iam_user_name=iam_user_name,
operation=action
))

View File

@ -8,6 +8,8 @@ import re
import io
import pytz
from moto.core.authentication import IAMRequest
from moto.core.exceptions import DryRunClientError
from jinja2 import Environment, DictLoader, TemplateNotFound
@ -103,7 +105,22 @@ class _TemplateEnvironmentMixin(object):
return self.environment.get_template(template_id)
class BaseResponse(_TemplateEnvironmentMixin):
class ActionAuthenticatorMixin(object):
INITIALIZATION_STEP_COUNT = 5
request_count = 0
def _authenticate_action(self):
iam_request = IAMRequest(method=self.method, path=self.path, data=self.querystring, headers=self.headers)
iam_request.check_signature()
if ActionAuthenticatorMixin.request_count >= ActionAuthenticatorMixin.INITIALIZATION_STEP_COUNT:
iam_request.check_action_permitted()
else:
ActionAuthenticatorMixin.request_count += 1
class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
default_region = 'us-east-1'
# to extract region, use [^.]
@ -273,6 +290,13 @@ class BaseResponse(_TemplateEnvironmentMixin):
def call_action(self):
headers = self.response_headers
try:
self._authenticate_action()
except HTTPException as http_error:
response = http_error.description, dict(status=http_error.code)
return self._send_response(headers, response)
action = camelcase_to_underscores(self._get_action())
method_names = method_names_from_class(self.__class__)
if action in method_names:
@ -285,16 +309,7 @@ class BaseResponse(_TemplateEnvironmentMixin):
if isinstance(response, six.string_types):
return 200, headers, response
else:
if len(response) == 2:
body, new_headers = response
else:
status, new_headers, body = response
status = new_headers.get('status', 200)
headers.update(new_headers)
# Cast status to string
if "status" in headers:
headers['status'] = str(headers['status'])
return status, headers, body
return self._send_response(headers, response)
if not action:
return 404, headers, ''
@ -302,6 +317,19 @@ class BaseResponse(_TemplateEnvironmentMixin):
raise NotImplementedError(
"The {0} action has not been implemented".format(action))
@staticmethod
def _send_response(headers, response):
if len(response) == 2:
body, new_headers = response
else:
status, new_headers, body = response
status = new_headers.get('status', 200)
headers.update(new_headers)
# Cast status to string
if "status" in headers:
headers['status'] = str(headers['status'])
return status, headers, body
def _get_param(self, param_name, if_none=None):
val = self.querystring.get(param_name)
if val is not None:

View File

@ -1,7 +1,9 @@
from __future__ import unicode_literals
from moto.core.responses import BaseResponse
from .models import iam_backend, User
from .models import iam_backend
AVATAO_USER_NAME = "avatao-user"
class IamResponse(BaseResponse):
@ -425,11 +427,10 @@ class IamResponse(BaseResponse):
def get_user(self):
user_name = self._get_param('UserName')
if user_name:
user = iam_backend.get_user(user_name)
else:
user = User(name='default_user')
# If no user is specific, IAM returns the current user
if not user_name:
user_name = AVATAO_USER_NAME
# If no user is specified, IAM returns the current user
user = iam_backend.get_user(user_name)
template = self.response_template(USER_TEMPLATE)
return template.render(action='Get', user=user)
@ -457,7 +458,6 @@ class IamResponse(BaseResponse):
def create_login_profile(self):
user_name = self._get_param('UserName')
password = self._get_param('Password')
password = self._get_param('Password')
user = iam_backend.create_login_profile(user_name, password)
template = self.response_template(CREATE_LOGIN_PROFILE_TEMPLATE)