Add tag & untag user (#3638)

* Add iam.tag_user

* Add iam.untag_user

* Fix Python2 error
This commit is contained in:
Anton Grübel 2021-02-01 12:37:54 +01:00 committed by GitHub
parent f918635ab5
commit fe9f1dfe14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 165 additions and 81 deletions

View File

@ -45,6 +45,7 @@ from .utils import (
random_resource_id,
random_policy_id,
)
from ..utilities.tagging_service import TaggingService
class MFADevice(object):
@ -924,7 +925,7 @@ class Group(BaseModel):
class User(CloudFormationModel):
def __init__(self, name, path=None, tags=None):
def __init__(self, name, path=None):
self.name = name
self.id = random_resource_id()
self.path = path if path else "/"
@ -937,7 +938,6 @@ class User(CloudFormationModel):
self.password = None
self.password_reset_required = False
self.signing_certificates = {}
self.tags = tags
@property
def arn(self):
@ -1135,7 +1135,8 @@ class User(CloudFormationModel):
):
properties = cloudformation_json.get("Properties", {})
path = properties.get("Path")
return iam_backend.create_user(resource_physical_name, path)
user, _ = iam_backend.create_user(resource_physical_name, path)
return user
@classmethod
def update_from_cloudformation_json(
@ -1415,6 +1416,8 @@ class IAMBackend(BaseBackend):
self.account_summary = AccountSummary(self)
self.inline_policies = {}
self.access_keys = {}
self.tagger = TaggingService()
super(IAMBackend, self).__init__()
def _init_managed_policies(self):
@ -1978,16 +1981,16 @@ class IAMBackend(BaseBackend):
"EntityAlreadyExists", "User {0} already exists".format(user_name)
)
user = User(user_name, path, tags)
user = User(user_name, path)
self.tagger.tag_resource(user.arn, tags or [])
self.users[user_name] = user
return user
return user, self.tagger.list_tags_for_resource(user.arn)
def get_user(self, user_name):
user = None
try:
user = self.users[user_name]
except KeyError:
raise IAMNotFoundException("User {0} not found".format(user_name))
def get_user(self, name):
user = self.users.get(name)
if not user:
raise NoSuchEntity("The user with name {} cannot be found.".format(name))
return user
@ -2147,7 +2150,7 @@ class IAMBackend(BaseBackend):
def list_user_tags(self, user_name):
user = self.get_user(user_name)
return user.tags
return self.tagger.list_tags_for_resource(user.arn)
def put_user_policy(self, user_name, policy_name, policy_json):
user = self.get_user(user_name)
@ -2204,7 +2207,7 @@ class IAMBackend(BaseBackend):
try: # User may have been deleted before their access key...
user = self.get_user(key.user_name)
user.delete_access_key(key.access_key_id)
except IAMNotFoundException:
except NoSuchEntity:
pass
del self.access_keys[name]
@ -2250,7 +2253,7 @@ class IAMBackend(BaseBackend):
"CreateDate": user.created_iso_8601,
"PasswordLastUsed": None, # not supported
"PermissionsBoundary": {}, # ToDo: add put_user_permissions_boundary() functionality
"Tags": {}, # ToDo: add tag_user() functionality
"Tags": self.tagger.list_tags_for_resource(user.arn)["Tags"],
}
user.enable_mfa_device(
@ -2355,6 +2358,7 @@ class IAMBackend(BaseBackend):
code="DeleteConflict",
message="Cannot delete entity, must delete policies first.",
)
self.tagger.delete_all_tags_for_resource(user.arn)
del self.users[user_name]
def report_generated(self):
@ -2574,5 +2578,15 @@ class IAMBackend(BaseBackend):
inline_policy.unapply_policy(self)
del self.inline_policies[policy_id]
def tag_user(self, name, tags):
user = self.get_user(name)
self.tagger.tag_resource(user.arn, tags)
def untag_user(self, name, tag_keys):
user = self.get_user(name)
self.tagger.untag_resource_using_names(user.arn, tag_keys)
iam_backend = IAMBackend()

View File

@ -471,9 +471,9 @@ class IamResponse(BaseResponse):
user_name = self._get_param("UserName")
path = self._get_param("Path")
tags = self._get_multi_param("Tags.member")
user = iam_backend.create_user(user_name, path, tags)
user, user_tags = iam_backend.create_user(user_name, path, tags)
template = self.response_template(USER_TEMPLATE)
return template.render(action="Create", user=user)
return template.render(action="Create", user=user, tags=user_tags["Tags"])
def get_user(self):
user_name = self._get_param("UserName")
@ -572,7 +572,7 @@ class IamResponse(BaseResponse):
user_name = self._get_param("UserName")
tags = iam_backend.list_user_tags(user_name)
template = self.response_template(LIST_USER_TAGS_TEMPLATE)
return template.render(user_tags=tags or [])
return template.render(user_tags=tags["Tags"])
def put_user_policy(self):
user_name = self._get_param("UserName")
@ -989,6 +989,24 @@ class IamResponse(BaseResponse):
template = self.response_template(GET_ACCOUNT_SUMMARY_TEMPLATE)
return template.render(summary_map=account_summary.summary_map)
def tag_user(self):
name = self._get_param("UserName")
tags = self._get_multi_param("Tags.member")
iam_backend.tag_user(name, tags)
template = self.response_template(TAG_USER_TEMPLATE)
return template.render()
def untag_user(self):
name = self._get_param("UserName")
tag_keys = self._get_multi_param("TagKeys.member")
iam_backend.untag_user(name, tag_keys)
template = self.response_template(UNTAG_USER_TEMPLATE)
return template.render()
LIST_ENTITIES_FOR_POLICY_TEMPLATE = """<ListEntitiesForPolicyResponse>
<ListEntitiesForPolicyResult>
@ -1684,9 +1702,9 @@ USER_TEMPLATE = """<{{ action }}UserResponse>
<UserId>{{ user.id }}</UserId>
<CreateDate>{{ user.created_iso_8601 }}</CreateDate>
<Arn>{{ user.arn }}</Arn>
{% if user.tags %}
{% if tags %}
<Tags>
{% for tag in user.tags %}
{% for tag in tags %}
<member>
<Key>{{ tag['Key'] }}</Key>
<Value>{{ tag['Value'] }}</Value>
@ -2039,13 +2057,23 @@ LIST_VIRTUAL_MFA_DEVICES_TEMPLATE = """<ListVirtualMFADevicesResponse xmlns="htt
{% if device.enable_date %}
<EnableDate>{{ device.enabled_iso_8601 }}</EnableDate>
{% endif %}
{% if device.user %}
{% if device.user_attribute %}
<User>
<Path>{{ device.user.path }}</Path>
<UserName>{{ device.user.name }}</UserName>
<UserId>{{ device.user.id }}</UserId>
<CreateDate>{{ device.user.created_iso_8601 }}</CreateDate>
<Arn>{{ device.user.arn }}</Arn>
<Path>{{ device.user_attribute.Path }}</Path>
<UserName>{{ device.user_attribute.UserName }}</UserName>
<UserId>{{ device.user_attribute.UserId }}</UserId>
<CreateDate>{{ device.user_attribute.CreateDate }}</CreateDate>
<Arn>{{ device.user_attribute.Arn }}</Arn>
{% if device.user_attribute.Tags %}
<Tags>
{% for tag in device.user_attribute.Tags %}
<member>
<Key>{{ tag['Key'] }}</Key>
<Value>{{ tag['Value'] }}</Value>
</member>
{% endfor %}
</Tags>
{% endif %}
</User>
{% endif %}
</member>
@ -2514,3 +2542,17 @@ GET_ACCOUNT_SUMMARY_TEMPLATE = """<GetAccountSummaryResponse xmlns="https://iam.
<RequestId>85cb9b90-ac28-11e4-a88d-97964EXAMPLE</RequestId>
</ResponseMetadata>
</GetAccountSummaryResponse>"""
TAG_USER_TEMPLATE = """<TagUserResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<ResponseMetadata>
<RequestId>EXAMPLE8-90ab-cdef-fedc-ba987EXAMPLE</RequestId>
</ResponseMetadata>
</TagUserResponse>"""
UNTAG_USER_TEMPLATE = """<UntagUserResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
<ResponseMetadata>
<RequestId>EXAMPLE8-90ab-cdef-fedc-ba987EXAMPLE</RequestId>
</ResponseMetadata>
</UntagUserResponse>"""

View File

@ -1140,8 +1140,9 @@ def test_enable_virtual_mfa_device():
client = boto3.client("iam", region_name="us-east-1")
response = client.create_virtual_mfa_device(VirtualMFADeviceName="test-device")
serial_number = response["VirtualMFADevice"]["SerialNumber"]
tags = [{"Key": "key", "Value": "value"}]
client.create_user(UserName="test-user")
client.create_user(UserName="test-user", Tags=tags)
client.enable_mfa_device(
UserName="test-user",
SerialNumber=serial_number,
@ -1165,6 +1166,7 @@ def test_enable_virtual_mfa_device():
"arn:aws:iam::{}:user/test-user".format(ACCOUNT_ID)
)
device["User"]["CreateDate"].should.be.a(datetime)
device["User"]["Tags"].should.equal(tags)
device["EnableDate"].should.be.a(datetime)
response["IsTruncated"].should_not.be.ok
@ -2924,7 +2926,7 @@ def test_list_user_tags():
],
)
response = conn.list_user_tags(UserName="kenny-bania")
response["Tags"].should.equal([])
response["Tags"].should.have.length_of(0)
response["IsTruncated"].should_not.be.ok
response = conn.list_user_tags(UserName="jackie-chiles")
@ -4047,3 +4049,80 @@ def test_create_user_with_tags():
resp = conn.create_user(UserName="test-create-user-no-tags")
assert "Tags" not in resp["User"]
@mock_iam
def test_tag_user():
# given
client = boto3.client("iam", region_name="eu-central-1")
name = "test-user"
tags = sorted(
[{"Key": "key", "Value": "value"}, {"Key": "key-2", "Value": "value-2"}],
key=lambda item: item["Key"],
)
client.create_user(UserName=name)
# when
client.tag_user(UserName=name, Tags=tags)
# then
response = client.list_user_tags(UserName=name)
sorted(response["Tags"], key=lambda item: item["Key"],).should.equal(tags)
@mock_iam
def test_tag_user_error_unknown_user_name():
# given
client = boto3.client("iam", region_name="eu-central-1")
name = "unknown"
# when
with pytest.raises(ClientError) as e:
client.tag_user(UserName=name, Tags=[{"Key": "key", "Value": "value"}])
# then
ex = e.value
ex.operation_name.should.equal("TagUser")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(404)
ex.response["Error"]["Code"].should.contain("NoSuchEntity")
ex.response["Error"]["Message"].should.equal(
"The user with name {} cannot be found.".format(name)
)
@mock_iam
def test_untag_user():
# given
client = boto3.client("iam", region_name="eu-central-1")
name = "test-user"
client.create_user(
UserName=name,
Tags=[{"Key": "key", "Value": "value"}, {"Key": "key-2", "Value": "value"}],
)
# when
client.untag_user(UserName=name, TagKeys=["key-2"])
# then
response = client.list_user_tags(UserName=name)
response["Tags"].should.equal([{"Key": "key", "Value": "value"}])
@mock_iam
def test_untag_user_error_unknown_user_name():
# given
client = boto3.client("iam", region_name="eu-central-1")
name = "unknown"
# when
with pytest.raises(ClientError) as e:
client.untag_user(UserName=name, TagKeys=["key"])
# then
ex = e.value
ex.operation_name.should.equal("UntagUser")
ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(404)
ex.response["Error"]["Code"].should.contain("NoSuchEntity")
ex.response["Error"]["Message"].should.equal(
"The user with name {} cannot be found.".format(name)
)

View File

@ -967,59 +967,6 @@ Outputs:
pass
@mock_iam
@mock_cloudformation
def test_iam_cloudformation_delete_users_access_key():
cf_client = boto3.client("cloudformation", region_name="us-east-1")
stack_name = "MyStack"
template = """
Resources:
TheUser:
Type: AWS::IAM::User
TheAccessKey:
Type: AWS::IAM::AccessKey
Properties:
UserName: !Ref TheUser
""".strip()
cf_client.create_stack(StackName=stack_name, TemplateBody=template)
provisioned_resources = cf_client.list_stack_resources(StackName=stack_name)[
"StackResourceSummaries"
]
provisioned_user = [
resource
for resource in provisioned_resources
if resource["LogicalResourceId"] == "TheUser"
][0]
user_name = provisioned_user["PhysicalResourceId"]
provisioned_access_key = [
resource
for resource in provisioned_resources
if resource["LogicalResourceId"] == "TheAccessKey"
][0]
access_key_id = provisioned_access_key["PhysicalResourceId"]
iam_client = boto3.client("iam", region_name="us-east-1")
user = iam_client.get_user(UserName=user_name)
access_keys = iam_client.list_access_keys(UserName=user_name)
access_key_id.should.equal(access_keys["AccessKeyMetadata"][0]["AccessKeyId"])
cf_client.delete_stack(StackName=stack_name)
iam_client.get_user.when.called_with(UserName=user_name).should.throw(
iam_client.exceptions.NoSuchEntityException
)
iam_client.list_access_keys.when.called_with(UserName=user_name).should.throw(
iam_client.exceptions.NoSuchEntityException
)
@mock_iam
@mock_cloudformation
def test_iam_cloudformation_delete_users_access_key():
@ -1055,13 +1002,15 @@ def test_iam_cloudformation_delete_users_access_key():
for resource in provisioned_resources
if resource["LogicalResourceId"] == "TheAccessKey"
]
len(provisioned_access_keys).should.equal(1)
provisioned_access_keys.should.have.length_of(1)
access_key_id = provisioned_access_keys[0]["PhysicalResourceId"]
iam_client = boto3.client("iam", region_name="us-east-1")
user = iam_client.get_user(UserName=user_name)["User"]
user["UserName"].should.equal(user_name)
access_keys = iam_client.list_access_keys(UserName=user_name)
access_keys["AccessKeyMetadata"][0]["UserName"].should.equal(user_name)
access_key_id.should.equal(access_keys["AccessKeyMetadata"][0]["AccessKeyId"])
cf_client.delete_stack(StackName=stack_name)