2020-04-16 03:08:44 +00:00
|
|
|
from base64 import b64decode
|
2013-05-24 21:22:34 +00:00
|
|
|
import datetime
|
2020-04-16 03:08:44 +00:00
|
|
|
import xmltodict
|
2017-03-12 04:41:12 +00:00
|
|
|
from moto.core import BaseBackend, BaseModel
|
2022-06-04 11:30:16 +00:00
|
|
|
from moto.core.utils import iso_8601_datetime_with_milliseconds, BackendDict
|
2022-05-08 22:25:40 +00:00
|
|
|
from moto.core import get_account_id
|
2019-10-31 15:44:26 +00:00
|
|
|
from moto.sts.utils import (
|
|
|
|
random_access_key_id,
|
|
|
|
random_secret_access_key,
|
|
|
|
random_session_token,
|
|
|
|
random_assumed_role_id,
|
2020-12-08 09:08:40 +00:00
|
|
|
DEFAULT_STS_SESSION_DURATION,
|
2019-10-31 15:44:26 +00:00
|
|
|
)
|
2022-06-09 17:40:22 +00:00
|
|
|
from typing import Mapping
|
2013-05-24 21:22:34 +00:00
|
|
|
|
|
|
|
|
2017-03-12 04:41:12 +00:00
|
|
|
class Token(BaseModel):
|
2022-03-11 21:28:45 +00:00
|
|
|
def __init__(self, duration, name=None):
|
2016-09-07 18:40:52 +00:00
|
|
|
now = datetime.datetime.utcnow()
|
2013-05-24 21:22:34 +00:00
|
|
|
self.expiration = now + datetime.timedelta(seconds=duration)
|
2014-03-20 16:29:39 +00:00
|
|
|
self.name = name
|
|
|
|
self.policy = None
|
2013-05-24 21:22:34 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def expiration_ISO8601(self):
|
2014-11-30 04:34:40 +00:00
|
|
|
return iso_8601_datetime_with_milliseconds(self.expiration)
|
2013-05-24 21:22:34 +00:00
|
|
|
|
|
|
|
|
2017-03-12 04:41:12 +00:00
|
|
|
class AssumedRole(BaseModel):
|
2013-05-24 21:22:34 +00:00
|
|
|
def __init__(self, role_session_name, role_arn, policy, duration, external_id):
|
|
|
|
self.session_name = role_session_name
|
2019-08-21 10:57:45 +00:00
|
|
|
self.role_arn = role_arn
|
2013-05-24 21:22:34 +00:00
|
|
|
self.policy = policy
|
2016-09-07 18:40:52 +00:00
|
|
|
now = datetime.datetime.utcnow()
|
2013-05-24 21:22:34 +00:00
|
|
|
self.expiration = now + datetime.timedelta(seconds=duration)
|
|
|
|
self.external_id = external_id
|
2019-07-08 14:32:25 +00:00
|
|
|
self.access_key_id = "ASIA" + random_access_key_id()
|
|
|
|
self.secret_access_key = random_secret_access_key()
|
|
|
|
self.session_token = random_session_token()
|
2019-08-21 08:45:36 +00:00
|
|
|
self.assumed_role_id = "AROA" + random_assumed_role_id()
|
2013-05-24 21:22:34 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def expiration_ISO8601(self):
|
2014-11-30 04:34:40 +00:00
|
|
|
return iso_8601_datetime_with_milliseconds(self.expiration)
|
2013-05-24 21:22:34 +00:00
|
|
|
|
2019-08-21 10:20:35 +00:00
|
|
|
@property
|
|
|
|
def user_id(self):
|
|
|
|
return self.assumed_role_id + ":" + self.session_name
|
|
|
|
|
2019-08-21 10:57:45 +00:00
|
|
|
@property
|
|
|
|
def arn(self):
|
2022-03-10 14:39:59 +00:00
|
|
|
return (
|
|
|
|
"arn:aws:sts::{account_id}:assumed-role/{role_name}/{session_name}".format(
|
2022-05-08 22:25:40 +00:00
|
|
|
account_id=get_account_id(),
|
2022-03-10 14:39:59 +00:00
|
|
|
role_name=self.role_arn.split("/")[-1],
|
|
|
|
session_name=self.session_name,
|
|
|
|
)
|
2019-08-21 17:47:12 +00:00
|
|
|
)
|
2019-08-21 10:57:45 +00:00
|
|
|
|
2013-05-24 21:22:34 +00:00
|
|
|
|
|
|
|
class STSBackend(BaseBackend):
|
2022-06-04 11:30:16 +00:00
|
|
|
def __init__(self, region_name, account_id):
|
|
|
|
super().__init__(region_name, account_id)
|
2019-07-08 17:57:14 +00:00
|
|
|
self.assumed_roles = []
|
|
|
|
|
2021-09-24 16:01:09 +00:00
|
|
|
@staticmethod
|
|
|
|
def default_vpc_endpoint_service(service_region, zones):
|
|
|
|
"""Default VPC endpoint service."""
|
|
|
|
return BaseBackend.default_vpc_endpoint_service_factory(
|
|
|
|
service_region, zones, "sts"
|
|
|
|
)
|
|
|
|
|
2013-05-24 21:22:34 +00:00
|
|
|
def get_session_token(self, duration):
|
|
|
|
token = Token(duration=duration)
|
|
|
|
return token
|
|
|
|
|
2022-03-11 21:28:45 +00:00
|
|
|
def get_federation_token(self, name, duration):
|
|
|
|
token = Token(duration=duration, name=name)
|
2014-03-20 16:29:39 +00:00
|
|
|
return token
|
|
|
|
|
2013-05-24 21:22:34 +00:00
|
|
|
def assume_role(self, **kwargs):
|
|
|
|
role = AssumedRole(**kwargs)
|
2019-07-08 17:57:14 +00:00
|
|
|
self.assumed_roles.append(role)
|
2013-05-24 21:22:34 +00:00
|
|
|
return role
|
|
|
|
|
2019-08-21 10:20:35 +00:00
|
|
|
def get_assumed_role_from_access_key(self, access_key_id):
|
|
|
|
for assumed_role in self.assumed_roles:
|
|
|
|
if assumed_role.access_key_id == access_key_id:
|
|
|
|
return assumed_role
|
|
|
|
return None
|
|
|
|
|
2019-07-16 03:27:47 +00:00
|
|
|
def assume_role_with_web_identity(self, **kwargs):
|
|
|
|
return self.assume_role(**kwargs)
|
|
|
|
|
2020-04-16 03:08:44 +00:00
|
|
|
def assume_role_with_saml(self, **kwargs):
|
|
|
|
del kwargs["principal_arn"]
|
|
|
|
saml_assertion_encoded = kwargs.pop("saml_assertion")
|
|
|
|
saml_assertion_decoded = b64decode(saml_assertion_encoded)
|
2020-12-08 09:08:40 +00:00
|
|
|
|
|
|
|
namespaces = {
|
|
|
|
"urn:oasis:names:tc:SAML:2.0:protocol": "samlp",
|
|
|
|
"urn:oasis:names:tc:SAML:2.0:assertion": "saml",
|
|
|
|
}
|
|
|
|
saml_assertion = xmltodict.parse(
|
|
|
|
saml_assertion_decoded.decode("utf-8"),
|
|
|
|
force_cdata=True,
|
|
|
|
process_namespaces=True,
|
|
|
|
namespaces=namespaces,
|
2022-03-03 11:51:00 +00:00
|
|
|
namespace_separator="|",
|
2020-04-16 03:08:44 +00:00
|
|
|
)
|
2020-12-08 09:08:40 +00:00
|
|
|
|
2022-03-03 11:51:00 +00:00
|
|
|
saml_assertion_attributes = saml_assertion["samlp|Response"]["saml|Assertion"][
|
|
|
|
"saml|AttributeStatement"
|
|
|
|
]["saml|Attribute"]
|
2020-12-08 09:08:40 +00:00
|
|
|
for attribute in saml_assertion_attributes:
|
|
|
|
if (
|
|
|
|
attribute["@Name"]
|
|
|
|
== "https://aws.amazon.com/SAML/Attributes/RoleSessionName"
|
|
|
|
):
|
2022-03-03 11:51:00 +00:00
|
|
|
kwargs["role_session_name"] = attribute["saml|AttributeValue"]["#text"]
|
2020-12-08 09:08:40 +00:00
|
|
|
if (
|
|
|
|
attribute["@Name"]
|
|
|
|
== "https://aws.amazon.com/SAML/Attributes/SessionDuration"
|
|
|
|
):
|
2022-03-03 11:51:00 +00:00
|
|
|
kwargs["duration"] = int(attribute["saml|AttributeValue"]["#text"])
|
2020-12-08 09:08:40 +00:00
|
|
|
|
|
|
|
if "duration" not in kwargs:
|
|
|
|
kwargs["duration"] = DEFAULT_STS_SESSION_DURATION
|
|
|
|
|
2020-04-16 03:08:44 +00:00
|
|
|
kwargs["external_id"] = None
|
|
|
|
kwargs["policy"] = None
|
|
|
|
role = AssumedRole(**kwargs)
|
|
|
|
self.assumed_roles.append(role)
|
|
|
|
return role
|
|
|
|
|
2021-01-27 18:54:21 +00:00
|
|
|
def get_caller_identity(self):
|
|
|
|
# Logic resides in responses.py
|
|
|
|
# Fake method here to make implementation coverage script aware that this method is implemented
|
|
|
|
pass
|
|
|
|
|
2017-02-24 02:37:43 +00:00
|
|
|
|
2022-06-09 17:40:22 +00:00
|
|
|
sts_backends: Mapping[str, STSBackend] = BackendDict(
|
2022-06-04 11:30:16 +00:00
|
|
|
STSBackend, "sts", use_boto3_regions=False, additional_regions=["global"]
|
|
|
|
)
|