Merge pull request #2573 from cloudtruth/add_iam_ssh_public_key_methods
Add the ssh_public_key methods that were missing from IAM
This commit is contained in:
commit
9ccc9ef1fd
@ -1,5 +1,6 @@
|
|||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
import base64
|
import base64
|
||||||
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
import string
|
import string
|
||||||
@ -475,6 +476,20 @@ class AccessKey(BaseModel):
|
|||||||
raise UnformattedGetAttTemplateException()
|
raise UnformattedGetAttTemplateException()
|
||||||
|
|
||||||
|
|
||||||
|
class SshPublicKey(BaseModel):
|
||||||
|
def __init__(self, user_name, ssh_public_key_body):
|
||||||
|
self.user_name = user_name
|
||||||
|
self.ssh_public_key_body = ssh_public_key_body
|
||||||
|
self.ssh_public_key_id = "APKA" + random_access_key()
|
||||||
|
self.fingerprint = hashlib.md5(ssh_public_key_body.encode()).hexdigest()
|
||||||
|
self.status = "Active"
|
||||||
|
self.upload_date = datetime.utcnow()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def uploaded_iso_8601(self):
|
||||||
|
return iso_8601_datetime_without_milliseconds(self.upload_date)
|
||||||
|
|
||||||
|
|
||||||
class Group(BaseModel):
|
class Group(BaseModel):
|
||||||
def __init__(self, name, path="/"):
|
def __init__(self, name, path="/"):
|
||||||
self.name = name
|
self.name = name
|
||||||
@ -536,6 +551,7 @@ class User(BaseModel):
|
|||||||
self.policies = {}
|
self.policies = {}
|
||||||
self.managed_policies = {}
|
self.managed_policies = {}
|
||||||
self.access_keys = []
|
self.access_keys = []
|
||||||
|
self.ssh_public_keys = []
|
||||||
self.password = None
|
self.password = None
|
||||||
self.password_reset_required = False
|
self.password_reset_required = False
|
||||||
self.signing_certificates = {}
|
self.signing_certificates = {}
|
||||||
@ -605,6 +621,33 @@ class User(BaseModel):
|
|||||||
"The Access Key with id {0} cannot be found".format(access_key_id)
|
"The Access Key with id {0} cannot be found".format(access_key_id)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def upload_ssh_public_key(self, ssh_public_key_body):
|
||||||
|
pubkey = SshPublicKey(self.name, ssh_public_key_body)
|
||||||
|
self.ssh_public_keys.append(pubkey)
|
||||||
|
return pubkey
|
||||||
|
|
||||||
|
def get_ssh_public_key(self, ssh_public_key_id):
|
||||||
|
for key in self.ssh_public_keys:
|
||||||
|
if key.ssh_public_key_id == ssh_public_key_id:
|
||||||
|
return key
|
||||||
|
else:
|
||||||
|
raise IAMNotFoundException(
|
||||||
|
"The SSH Public Key with id {0} cannot be found".format(
|
||||||
|
ssh_public_key_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_all_ssh_public_keys(self):
|
||||||
|
return self.ssh_public_keys
|
||||||
|
|
||||||
|
def update_ssh_public_key(self, ssh_public_key_id, status):
|
||||||
|
key = self.get_ssh_public_key(ssh_public_key_id)
|
||||||
|
key.status = status
|
||||||
|
|
||||||
|
def delete_ssh_public_key(self, ssh_public_key_id):
|
||||||
|
key = self.get_ssh_public_key(ssh_public_key_id)
|
||||||
|
self.ssh_public_keys.remove(key)
|
||||||
|
|
||||||
def get_cfn_attribute(self, attribute_name):
|
def get_cfn_attribute(self, attribute_name):
|
||||||
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
||||||
|
|
||||||
@ -1584,6 +1627,26 @@ class IAMBackend(BaseBackend):
|
|||||||
user = self.get_user(user_name)
|
user = self.get_user(user_name)
|
||||||
user.delete_access_key(access_key_id)
|
user.delete_access_key(access_key_id)
|
||||||
|
|
||||||
|
def upload_ssh_public_key(self, user_name, ssh_public_key_body):
|
||||||
|
user = self.get_user(user_name)
|
||||||
|
return user.upload_ssh_public_key(ssh_public_key_body)
|
||||||
|
|
||||||
|
def get_ssh_public_key(self, user_name, ssh_public_key_id):
|
||||||
|
user = self.get_user(user_name)
|
||||||
|
return user.get_ssh_public_key(ssh_public_key_id)
|
||||||
|
|
||||||
|
def get_all_ssh_public_keys(self, user_name):
|
||||||
|
user = self.get_user(user_name)
|
||||||
|
return user.get_all_ssh_public_keys()
|
||||||
|
|
||||||
|
def update_ssh_public_key(self, user_name, ssh_public_key_id, status):
|
||||||
|
user = self.get_user(user_name)
|
||||||
|
return user.update_ssh_public_key(ssh_public_key_id, status)
|
||||||
|
|
||||||
|
def delete_ssh_public_key(self, user_name, ssh_public_key_id):
|
||||||
|
user = self.get_user(user_name)
|
||||||
|
return user.delete_ssh_public_key(ssh_public_key_id)
|
||||||
|
|
||||||
def enable_mfa_device(
|
def enable_mfa_device(
|
||||||
self, user_name, serial_number, authentication_code_1, authentication_code_2
|
self, user_name, serial_number, authentication_code_1, authentication_code_2
|
||||||
):
|
):
|
||||||
|
@ -590,6 +590,46 @@ class IamResponse(BaseResponse):
|
|||||||
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
|
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
|
||||||
return template.render(name="DeleteAccessKey")
|
return template.render(name="DeleteAccessKey")
|
||||||
|
|
||||||
|
def upload_ssh_public_key(self):
|
||||||
|
user_name = self._get_param("UserName")
|
||||||
|
ssh_public_key_body = self._get_param("SSHPublicKeyBody")
|
||||||
|
|
||||||
|
key = iam_backend.upload_ssh_public_key(user_name, ssh_public_key_body)
|
||||||
|
template = self.response_template(UPLOAD_SSH_PUBLIC_KEY_TEMPLATE)
|
||||||
|
return template.render(key=key)
|
||||||
|
|
||||||
|
def get_ssh_public_key(self):
|
||||||
|
user_name = self._get_param("UserName")
|
||||||
|
ssh_public_key_id = self._get_param("SSHPublicKeyId")
|
||||||
|
|
||||||
|
key = iam_backend.get_ssh_public_key(user_name, ssh_public_key_id)
|
||||||
|
template = self.response_template(GET_SSH_PUBLIC_KEY_TEMPLATE)
|
||||||
|
return template.render(key=key)
|
||||||
|
|
||||||
|
def list_ssh_public_keys(self):
|
||||||
|
user_name = self._get_param("UserName")
|
||||||
|
|
||||||
|
keys = iam_backend.get_all_ssh_public_keys(user_name)
|
||||||
|
template = self.response_template(LIST_SSH_PUBLIC_KEYS_TEMPLATE)
|
||||||
|
return template.render(keys=keys)
|
||||||
|
|
||||||
|
def update_ssh_public_key(self):
|
||||||
|
user_name = self._get_param("UserName")
|
||||||
|
ssh_public_key_id = self._get_param("SSHPublicKeyId")
|
||||||
|
status = self._get_param("Status")
|
||||||
|
|
||||||
|
iam_backend.update_ssh_public_key(user_name, ssh_public_key_id, status)
|
||||||
|
template = self.response_template(UPDATE_SSH_PUBLIC_KEY_TEMPLATE)
|
||||||
|
return template.render()
|
||||||
|
|
||||||
|
def delete_ssh_public_key(self):
|
||||||
|
user_name = self._get_param("UserName")
|
||||||
|
ssh_public_key_id = self._get_param("SSHPublicKeyId")
|
||||||
|
|
||||||
|
iam_backend.delete_ssh_public_key(user_name, ssh_public_key_id)
|
||||||
|
template = self.response_template(DELETE_SSH_PUBLIC_KEY_TEMPLATE)
|
||||||
|
return template.render()
|
||||||
|
|
||||||
def deactivate_mfa_device(self):
|
def deactivate_mfa_device(self):
|
||||||
user_name = self._get_param("UserName")
|
user_name = self._get_param("UserName")
|
||||||
serial_number = self._get_param("SerialNumber")
|
serial_number = self._get_param("SerialNumber")
|
||||||
@ -1696,6 +1736,73 @@ GET_ACCESS_KEY_LAST_USED_TEMPLATE = """
|
|||||||
</GetAccessKeyLastUsedResponse>
|
</GetAccessKeyLastUsedResponse>
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
UPLOAD_SSH_PUBLIC_KEY_TEMPLATE = """<UploadSSHPublicKeyResponse>
|
||||||
|
<UploadSSHPublicKeyResult>
|
||||||
|
<SSHPublicKey>
|
||||||
|
<UserName>{{ key.user_name }}</UserName>
|
||||||
|
<SSHPublicKeyBody>{{ key.ssh_public_key_body }}</SSHPublicKeyBody>
|
||||||
|
<SSHPublicKeyId>{{ key.ssh_public_key_id }}</SSHPublicKeyId>
|
||||||
|
<Fingerprint>{{ key.fingerprint }}</Fingerprint>
|
||||||
|
<Status>{{ key.status }}</Status>
|
||||||
|
<UploadDate>{{ key.uploaded_iso_8601 }}</UploadDate>
|
||||||
|
</SSHPublicKey>
|
||||||
|
</UploadSSHPublicKeyResult>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</UploadSSHPublicKeyResponse>"""
|
||||||
|
|
||||||
|
GET_SSH_PUBLIC_KEY_TEMPLATE = """<GetSSHPublicKeyResponse>
|
||||||
|
<GetSSHPublicKeyResult>
|
||||||
|
<SSHPublicKey>
|
||||||
|
<UserName>{{ key.user_name }}</UserName>
|
||||||
|
<SSHPublicKeyBody>{{ key.ssh_public_key_body }}</SSHPublicKeyBody>
|
||||||
|
<SSHPublicKeyId>{{ key.ssh_public_key_id }}</SSHPublicKeyId>
|
||||||
|
<Fingerprint>{{ key.fingerprint }}</Fingerprint>
|
||||||
|
<Status>{{ key.status }}</Status>
|
||||||
|
<UploadDate>{{ key.uploaded_iso_8601 }}</UploadDate>
|
||||||
|
</SSHPublicKey>
|
||||||
|
</GetSSHPublicKeyResult>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</GetSSHPublicKeyResponse>"""
|
||||||
|
|
||||||
|
LIST_SSH_PUBLIC_KEYS_TEMPLATE = """<ListSSHPublicKeysResponse>
|
||||||
|
<ListSSHPublicKeysResult>
|
||||||
|
<SSHPublicKeys>
|
||||||
|
{% for key in keys %}
|
||||||
|
<member>
|
||||||
|
<UserName>{{ key.user_name }}</UserName>
|
||||||
|
<SSHPublicKeyId>{{ key.ssh_public_key_id }}</SSHPublicKeyId>
|
||||||
|
<Status>{{ key.status }}</Status>
|
||||||
|
<UploadDate>{{ key.uploaded_iso_8601 }}</UploadDate>
|
||||||
|
</member>
|
||||||
|
{% endfor %}
|
||||||
|
</SSHPublicKeys>
|
||||||
|
<IsTruncated>false</IsTruncated>
|
||||||
|
</ListSSHPublicKeysResult>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</ListSSHPublicKeysResponse>"""
|
||||||
|
|
||||||
|
UPDATE_SSH_PUBLIC_KEY_TEMPLATE = """<UpdateSSHPublicKeyResponse>
|
||||||
|
<UpdateSSHPublicKeyResult>
|
||||||
|
</UpdateSSHPublicKeyResult>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</UpdateSSHPublicKeyResponse>"""
|
||||||
|
|
||||||
|
DELETE_SSH_PUBLIC_KEY_TEMPLATE = """<DeleteSSHPublicKeyResponse>
|
||||||
|
<DeleteSSHPublicKeyResult>
|
||||||
|
</DeleteSSHPublicKeyResult>
|
||||||
|
<ResponseMetadata>
|
||||||
|
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||||
|
</ResponseMetadata>
|
||||||
|
</DeleteSSHPublicKeyResponse>"""
|
||||||
|
|
||||||
CREDENTIAL_REPORT_GENERATING = """
|
CREDENTIAL_REPORT_GENERATING = """
|
||||||
<GenerateCredentialReportResponse>
|
<GenerateCredentialReportResponse>
|
||||||
<GenerateCredentialReportResult>
|
<GenerateCredentialReportResult>
|
||||||
|
@ -1316,6 +1316,122 @@ def test_get_access_key_last_used():
|
|||||||
resp["UserName"].should.equal(create_key_response["UserName"])
|
resp["UserName"].should.equal(create_key_response["UserName"])
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iam
|
||||||
|
def test_upload_ssh_public_key():
|
||||||
|
iam = boto3.resource("iam", region_name="us-east-1")
|
||||||
|
client = iam.meta.client
|
||||||
|
username = "test-user"
|
||||||
|
iam.create_user(UserName=username)
|
||||||
|
public_key = MOCK_CERT
|
||||||
|
|
||||||
|
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||||
|
pubkey = resp["SSHPublicKey"]
|
||||||
|
pubkey["SSHPublicKeyBody"].should.equal(public_key)
|
||||||
|
pubkey["UserName"].should.equal(username)
|
||||||
|
pubkey["SSHPublicKeyId"].should.have.length_of(20)
|
||||||
|
assert pubkey["SSHPublicKeyId"].startswith("APKA")
|
||||||
|
pubkey.should.have.key("Fingerprint")
|
||||||
|
pubkey["Status"].should.equal("Active")
|
||||||
|
(
|
||||||
|
datetime.utcnow() - pubkey["UploadDate"].replace(tzinfo=None)
|
||||||
|
).seconds.should.be.within(0, 10)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iam
|
||||||
|
def test_get_ssh_public_key():
|
||||||
|
iam = boto3.resource("iam", region_name="us-east-1")
|
||||||
|
client = iam.meta.client
|
||||||
|
username = "test-user"
|
||||||
|
iam.create_user(UserName=username)
|
||||||
|
public_key = MOCK_CERT
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
client.get_ssh_public_key(
|
||||||
|
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Encoding="SSH"
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||||
|
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||||
|
|
||||||
|
resp = client.get_ssh_public_key(
|
||||||
|
UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
|
||||||
|
)
|
||||||
|
resp["SSHPublicKey"]["SSHPublicKeyBody"].should.equal(public_key)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iam
|
||||||
|
def test_list_ssh_public_keys():
|
||||||
|
iam = boto3.resource("iam", region_name="us-east-1")
|
||||||
|
client = iam.meta.client
|
||||||
|
username = "test-user"
|
||||||
|
iam.create_user(UserName=username)
|
||||||
|
public_key = MOCK_CERT
|
||||||
|
|
||||||
|
resp = client.list_ssh_public_keys(UserName=username)
|
||||||
|
resp["SSHPublicKeys"].should.have.length_of(0)
|
||||||
|
|
||||||
|
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||||
|
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||||
|
|
||||||
|
resp = client.list_ssh_public_keys(UserName=username)
|
||||||
|
resp["SSHPublicKeys"].should.have.length_of(1)
|
||||||
|
resp["SSHPublicKeys"][0]["SSHPublicKeyId"].should.equal(ssh_public_key_id)
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iam
|
||||||
|
def test_update_ssh_public_key():
|
||||||
|
iam = boto3.resource("iam", region_name="us-east-1")
|
||||||
|
client = iam.meta.client
|
||||||
|
username = "test-user"
|
||||||
|
iam.create_user(UserName=username)
|
||||||
|
public_key = MOCK_CERT
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
client.update_ssh_public_key(
|
||||||
|
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Status="Inactive"
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||||
|
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||||
|
resp["SSHPublicKey"]["Status"].should.equal("Active")
|
||||||
|
|
||||||
|
resp = client.update_ssh_public_key(
|
||||||
|
UserName=username, SSHPublicKeyId=ssh_public_key_id, Status="Inactive"
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.get_ssh_public_key(
|
||||||
|
UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
|
||||||
|
)
|
||||||
|
resp["SSHPublicKey"]["Status"].should.equal("Inactive")
|
||||||
|
|
||||||
|
|
||||||
|
@mock_iam
|
||||||
|
def test_delete_ssh_public_key():
|
||||||
|
iam = boto3.resource("iam", region_name="us-east-1")
|
||||||
|
client = iam.meta.client
|
||||||
|
username = "test-user"
|
||||||
|
iam.create_user(UserName=username)
|
||||||
|
public_key = MOCK_CERT
|
||||||
|
|
||||||
|
with assert_raises(ClientError):
|
||||||
|
client.delete_ssh_public_key(
|
||||||
|
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx"
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||||
|
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||||
|
|
||||||
|
resp = client.list_ssh_public_keys(UserName=username)
|
||||||
|
resp["SSHPublicKeys"].should.have.length_of(1)
|
||||||
|
|
||||||
|
resp = client.delete_ssh_public_key(
|
||||||
|
UserName=username, SSHPublicKeyId=ssh_public_key_id
|
||||||
|
)
|
||||||
|
|
||||||
|
resp = client.list_ssh_public_keys(UserName=username)
|
||||||
|
resp["SSHPublicKeys"].should.have.length_of(0)
|
||||||
|
|
||||||
|
|
||||||
@mock_iam
|
@mock_iam
|
||||||
def test_get_account_authorization_details():
|
def test_get_account_authorization_details():
|
||||||
test_policy = json.dumps(
|
test_policy = json.dumps(
|
||||||
|
Loading…
Reference in New Issue
Block a user