Add the ssh_public_key methods that were missing from the IAM implementation
This commit is contained in:
parent
6922606398
commit
d9c8bdb2a0
@ -1,5 +1,6 @@
|
||||
from __future__ import unicode_literals
|
||||
import base64
|
||||
import hashlib
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
@ -475,6 +476,27 @@ class AccessKey(BaseModel):
|
||||
raise UnformattedGetAttTemplateException()
|
||||
|
||||
|
||||
class SshPublicKey(BaseModel):
|
||||
def __init__(self, user_name, ssh_public_key_body):
|
||||
self.user_name = user_name
|
||||
self.ssh_public_key_body = ssh_public_key_body
|
||||
self.ssh_public_key_id = "APKA" + random_access_key()
|
||||
self.fingerprint = hashlib.md5(ssh_public_key_body.encode()).hexdigest()
|
||||
self.status = "Active"
|
||||
self.upload_date = datetime.utcnow()
|
||||
|
||||
@property
|
||||
def uploaded_iso_8601(self):
|
||||
return iso_8601_datetime_without_milliseconds(self.upload_date)
|
||||
|
||||
def get_cfn_attribute(self, attribute_name):
|
||||
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
||||
|
||||
if attribute_name == "PublicKeyBody":
|
||||
return self.public_key_body
|
||||
raise UnformattedGetAttTemplateException()
|
||||
|
||||
|
||||
class Group(BaseModel):
|
||||
def __init__(self, name, path="/"):
|
||||
self.name = name
|
||||
@ -536,6 +558,7 @@ class User(BaseModel):
|
||||
self.policies = {}
|
||||
self.managed_policies = {}
|
||||
self.access_keys = []
|
||||
self.ssh_public_keys = []
|
||||
self.password = None
|
||||
self.password_reset_required = False
|
||||
self.signing_certificates = {}
|
||||
@ -605,6 +628,33 @@ class User(BaseModel):
|
||||
"The Access Key with id {0} cannot be found".format(access_key_id)
|
||||
)
|
||||
|
||||
def upload_ssh_public_key(self, ssh_public_key_body):
|
||||
pubkey = SshPublicKey(self.name, ssh_public_key_body)
|
||||
self.ssh_public_keys.append(pubkey)
|
||||
return pubkey
|
||||
|
||||
def get_ssh_public_key(self, ssh_public_key_id):
|
||||
for key in self.ssh_public_keys:
|
||||
if key.ssh_public_key_id == ssh_public_key_id:
|
||||
return key
|
||||
else:
|
||||
raise IAMNotFoundException(
|
||||
"The SSH Public Key with id {0} cannot be found".format(
|
||||
ssh_public_key_id
|
||||
)
|
||||
)
|
||||
|
||||
def get_all_ssh_public_keys(self):
|
||||
return self.ssh_public_keys
|
||||
|
||||
def update_ssh_public_key(self, ssh_public_key_id, status):
|
||||
key = self.get_ssh_public_key(ssh_public_key_id)
|
||||
key.status = status
|
||||
|
||||
def delete_ssh_public_key(self, ssh_public_key_id):
|
||||
key = self.get_ssh_public_key(ssh_public_key_id)
|
||||
self.ssh_public_keys.remove(key)
|
||||
|
||||
def get_cfn_attribute(self, attribute_name):
|
||||
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
|
||||
|
||||
@ -1584,6 +1634,26 @@ class IAMBackend(BaseBackend):
|
||||
user = self.get_user(user_name)
|
||||
user.delete_access_key(access_key_id)
|
||||
|
||||
def upload_ssh_public_key(self, user_name, ssh_public_key_body):
|
||||
user = self.get_user(user_name)
|
||||
return user.upload_ssh_public_key(ssh_public_key_body)
|
||||
|
||||
def get_ssh_public_key(self, user_name, ssh_public_key_id):
|
||||
user = self.get_user(user_name)
|
||||
return user.get_ssh_public_key(ssh_public_key_id)
|
||||
|
||||
def get_all_ssh_public_keys(self, user_name):
|
||||
user = self.get_user(user_name)
|
||||
return user.get_all_ssh_public_keys()
|
||||
|
||||
def update_ssh_public_key(self, user_name, ssh_public_key_id, status):
|
||||
user = self.get_user(user_name)
|
||||
return user.update_ssh_public_key(ssh_public_key_id, status)
|
||||
|
||||
def delete_ssh_public_key(self, user_name, ssh_public_key_id):
|
||||
user = self.get_user(user_name)
|
||||
return user.delete_ssh_public_key(ssh_public_key_id)
|
||||
|
||||
def enable_mfa_device(
|
||||
self, user_name, serial_number, authentication_code_1, authentication_code_2
|
||||
):
|
||||
|
@ -590,6 +590,46 @@ class IamResponse(BaseResponse):
|
||||
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
|
||||
return template.render(name="DeleteAccessKey")
|
||||
|
||||
def upload_ssh_public_key(self):
|
||||
user_name = self._get_param("UserName")
|
||||
ssh_public_key_body = self._get_param("SSHPublicKeyBody")
|
||||
|
||||
key = iam_backend.upload_ssh_public_key(user_name, ssh_public_key_body)
|
||||
template = self.response_template(UPLOAD_SSH_PUBLIC_KEY_TEMPLATE)
|
||||
return template.render(key=key)
|
||||
|
||||
def get_ssh_public_key(self):
|
||||
user_name = self._get_param("UserName")
|
||||
ssh_public_key_id = self._get_param("SSHPublicKeyId")
|
||||
|
||||
key = iam_backend.get_ssh_public_key(user_name, ssh_public_key_id)
|
||||
template = self.response_template(GET_SSH_PUBLIC_KEY_TEMPLATE)
|
||||
return template.render(key=key)
|
||||
|
||||
def list_ssh_public_keys(self):
|
||||
user_name = self._get_param("UserName")
|
||||
|
||||
keys = iam_backend.get_all_ssh_public_keys(user_name)
|
||||
template = self.response_template(LIST_SSH_PUBLIC_KEYS_TEMPLATE)
|
||||
return template.render(keys=keys)
|
||||
|
||||
def update_ssh_public_key(self):
|
||||
user_name = self._get_param("UserName")
|
||||
ssh_public_key_id = self._get_param("SSHPublicKeyId")
|
||||
status = self._get_param("Status")
|
||||
|
||||
iam_backend.update_ssh_public_key(user_name, ssh_public_key_id, status)
|
||||
template = self.response_template(UPDATE_SSH_PUBLIC_KEY_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def delete_ssh_public_key(self):
|
||||
user_name = self._get_param("UserName")
|
||||
ssh_public_key_id = self._get_param("SSHPublicKeyId")
|
||||
|
||||
iam_backend.delete_ssh_public_key(user_name, ssh_public_key_id)
|
||||
template = self.response_template(DELETE_SSH_PUBLIC_KEY_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def deactivate_mfa_device(self):
|
||||
user_name = self._get_param("UserName")
|
||||
serial_number = self._get_param("SerialNumber")
|
||||
@ -1696,6 +1736,73 @@ GET_ACCESS_KEY_LAST_USED_TEMPLATE = """
|
||||
</GetAccessKeyLastUsedResponse>
|
||||
"""
|
||||
|
||||
UPLOAD_SSH_PUBLIC_KEY_TEMPLATE = """<UploadSSHPublicKeyResponse>
|
||||
<UploadSSHPublicKeyResult>
|
||||
<SSHPublicKey>
|
||||
<UserName>{{ key.user_name }}</UserName>
|
||||
<SSHPublicKeyBody>{{ key.ssh_public_key_body }}</SSHPublicKeyBody>
|
||||
<SSHPublicKeyId>{{ key.ssh_public_key_id }}</SSHPublicKeyId>
|
||||
<Fingerprint>{{ key.fingerprint }}</Fingerprint>
|
||||
<Status>{{ key.status }}</Status>
|
||||
<UploadDate>{{ key.uploaded_iso_8601 }}</UploadDate>
|
||||
</SSHPublicKey>
|
||||
</UploadSSHPublicKeyResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</UploadSSHPublicKeyResponse>"""
|
||||
|
||||
GET_SSH_PUBLIC_KEY_TEMPLATE = """<GetSSHPublicKeyResponse>
|
||||
<GetSSHPublicKeyResult>
|
||||
<SSHPublicKey>
|
||||
<UserName>{{ key.user_name }}</UserName>
|
||||
<SSHPublicKeyBody>{{ key.ssh_public_key_body }}</SSHPublicKeyBody>
|
||||
<SSHPublicKeyId>{{ key.ssh_public_key_id }}</SSHPublicKeyId>
|
||||
<Fingerprint>{{ key.fingerprint }}</Fingerprint>
|
||||
<Status>{{ key.status }}</Status>
|
||||
<UploadDate>{{ key.uploaded_iso_8601 }}</UploadDate>
|
||||
</SSHPublicKey>
|
||||
</GetSSHPublicKeyResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</GetSSHPublicKeyResponse>"""
|
||||
|
||||
LIST_SSH_PUBLIC_KEYS_TEMPLATE = """<ListSSHPublicKeysResponse>
|
||||
<ListSSHPublicKeysResult>
|
||||
<SSHPublicKeys>
|
||||
{% for key in keys %}
|
||||
<member>
|
||||
<UserName>{{ key.user_name }}</UserName>
|
||||
<SSHPublicKeyId>{{ key.ssh_public_key_id }}</SSHPublicKeyId>
|
||||
<Status>{{ key.status }}</Status>
|
||||
<UploadDate>{{ key.uploaded_iso_8601 }}</UploadDate>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</SSHPublicKeys>
|
||||
<IsTruncated>false</IsTruncated>
|
||||
</ListSSHPublicKeysResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</ListSSHPublicKeysResponse>"""
|
||||
|
||||
UPDATE_SSH_PUBLIC_KEY_TEMPLATE = """<UpdateSSHPublicKeyResponse>
|
||||
<UpdateSSHPublicKeyResult>
|
||||
</UpdateSSHPublicKeyResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</UpdateSSHPublicKeyResponse>"""
|
||||
|
||||
DELETE_SSH_PUBLIC_KEY_TEMPLATE = """<DeleteSSHPublicKeyResponse>
|
||||
<DeleteSSHPublicKeyResult>
|
||||
</DeleteSSHPublicKeyResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</DeleteSSHPublicKeyResponse>"""
|
||||
|
||||
CREDENTIAL_REPORT_GENERATING = """
|
||||
<GenerateCredentialReportResponse>
|
||||
<GenerateCredentialReportResult>
|
||||
|
@ -1316,6 +1316,122 @@ def test_get_access_key_last_used():
|
||||
resp["UserName"].should.equal(create_key_response["UserName"])
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_upload_ssh_public_key():
|
||||
iam = boto3.resource("iam", region_name="us-east-1")
|
||||
client = iam.meta.client
|
||||
username = "test-user"
|
||||
iam.create_user(UserName=username)
|
||||
public_key = MOCK_CERT
|
||||
|
||||
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||
pubkey = resp["SSHPublicKey"]
|
||||
pubkey["SSHPublicKeyBody"].should.equal(public_key)
|
||||
pubkey["UserName"].should.equal(username)
|
||||
pubkey["SSHPublicKeyId"].should.have.length_of(20)
|
||||
assert pubkey["SSHPublicKeyId"].startswith("APKA")
|
||||
pubkey.should.have.key("Fingerprint")
|
||||
pubkey["Status"].should.equal("Active")
|
||||
(
|
||||
datetime.utcnow() - pubkey["UploadDate"].replace(tzinfo=None)
|
||||
).seconds.should.be.within(0, 10)
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_get_ssh_public_key():
|
||||
iam = boto3.resource("iam", region_name="us-east-1")
|
||||
client = iam.meta.client
|
||||
username = "test-user"
|
||||
iam.create_user(UserName=username)
|
||||
public_key = MOCK_CERT
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.get_ssh_public_key(
|
||||
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Encoding="SSH"
|
||||
)
|
||||
|
||||
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||
|
||||
resp = client.get_ssh_public_key(
|
||||
UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
|
||||
)
|
||||
resp["SSHPublicKey"]["SSHPublicKeyBody"].should.equal(public_key)
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_list_ssh_public_keys():
|
||||
iam = boto3.resource("iam", region_name="us-east-1")
|
||||
client = iam.meta.client
|
||||
username = "test-user"
|
||||
iam.create_user(UserName=username)
|
||||
public_key = MOCK_CERT
|
||||
|
||||
resp = client.list_ssh_public_keys(UserName=username)
|
||||
resp["SSHPublicKeys"].should.have.length_of(0)
|
||||
|
||||
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||
|
||||
resp = client.list_ssh_public_keys(UserName=username)
|
||||
resp["SSHPublicKeys"].should.have.length_of(1)
|
||||
resp["SSHPublicKeys"][0]["SSHPublicKeyId"].should.equal(ssh_public_key_id)
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_update_ssh_public_key():
|
||||
iam = boto3.resource("iam", region_name="us-east-1")
|
||||
client = iam.meta.client
|
||||
username = "test-user"
|
||||
iam.create_user(UserName=username)
|
||||
public_key = MOCK_CERT
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.update_ssh_public_key(
|
||||
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Status="Inactive"
|
||||
)
|
||||
|
||||
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||
resp["SSHPublicKey"]["Status"].should.equal("Active")
|
||||
|
||||
resp = client.update_ssh_public_key(
|
||||
UserName=username, SSHPublicKeyId=ssh_public_key_id, Status="Inactive"
|
||||
)
|
||||
|
||||
resp = client.get_ssh_public_key(
|
||||
UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
|
||||
)
|
||||
resp["SSHPublicKey"]["Status"].should.equal("Inactive")
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_delete_ssh_public_key():
|
||||
iam = boto3.resource("iam", region_name="us-east-1")
|
||||
client = iam.meta.client
|
||||
username = "test-user"
|
||||
iam.create_user(UserName=username)
|
||||
public_key = MOCK_CERT
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.delete_ssh_public_key(
|
||||
UserName=username, SSHPublicKeyId="xxnon-existent-keyxx"
|
||||
)
|
||||
|
||||
resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
|
||||
ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
|
||||
|
||||
resp = client.list_ssh_public_keys(UserName=username)
|
||||
resp["SSHPublicKeys"].should.have.length_of(1)
|
||||
|
||||
resp = client.delete_ssh_public_key(
|
||||
UserName=username, SSHPublicKeyId=ssh_public_key_id
|
||||
)
|
||||
|
||||
resp = client.list_ssh_public_keys(UserName=username)
|
||||
resp["SSHPublicKeys"].should.have.length_of(0)
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_get_account_authorization_details():
|
||||
test_policy = json.dumps(
|
||||
|
Loading…
Reference in New Issue
Block a user