diff --git a/moto/iam/models.py b/moto/iam/models.py
index 82f1c2bca..73d58b996 100644
--- a/moto/iam/models.py
+++ b/moto/iam/models.py
@@ -1,5 +1,6 @@
from __future__ import unicode_literals
import base64
+import hashlib
import os
import random
import string
@@ -475,6 +476,20 @@ class AccessKey(BaseModel):
raise UnformattedGetAttTemplateException()
+class SshPublicKey(BaseModel):
+ def __init__(self, user_name, ssh_public_key_body):
+ self.user_name = user_name
+ self.ssh_public_key_body = ssh_public_key_body
+ self.ssh_public_key_id = "APKA" + random_access_key()
+ self.fingerprint = hashlib.md5(ssh_public_key_body.encode()).hexdigest()
+ self.status = "Active"
+ self.upload_date = datetime.utcnow()
+
+ @property
+ def uploaded_iso_8601(self):
+ return iso_8601_datetime_without_milliseconds(self.upload_date)
+
+
class Group(BaseModel):
def __init__(self, name, path="/"):
self.name = name
@@ -536,6 +551,7 @@ class User(BaseModel):
self.policies = {}
self.managed_policies = {}
self.access_keys = []
+ self.ssh_public_keys = []
self.password = None
self.password_reset_required = False
self.signing_certificates = {}
@@ -605,6 +621,33 @@ class User(BaseModel):
"The Access Key with id {0} cannot be found".format(access_key_id)
)
+ def upload_ssh_public_key(self, ssh_public_key_body):
+ pubkey = SshPublicKey(self.name, ssh_public_key_body)
+ self.ssh_public_keys.append(pubkey)
+ return pubkey
+
+ def get_ssh_public_key(self, ssh_public_key_id):
+ for key in self.ssh_public_keys:
+ if key.ssh_public_key_id == ssh_public_key_id:
+ return key
+ else:
+ raise IAMNotFoundException(
+ "The SSH Public Key with id {0} cannot be found".format(
+ ssh_public_key_id
+ )
+ )
+
+ def get_all_ssh_public_keys(self):
+ return self.ssh_public_keys
+
+ def update_ssh_public_key(self, ssh_public_key_id, status):
+ key = self.get_ssh_public_key(ssh_public_key_id)
+ key.status = status
+
+ def delete_ssh_public_key(self, ssh_public_key_id):
+ key = self.get_ssh_public_key(ssh_public_key_id)
+ self.ssh_public_keys.remove(key)
+
def get_cfn_attribute(self, attribute_name):
from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
@@ -1584,6 +1627,26 @@ class IAMBackend(BaseBackend):
user = self.get_user(user_name)
user.delete_access_key(access_key_id)
+ def upload_ssh_public_key(self, user_name, ssh_public_key_body):
+ user = self.get_user(user_name)
+ return user.upload_ssh_public_key(ssh_public_key_body)
+
+ def get_ssh_public_key(self, user_name, ssh_public_key_id):
+ user = self.get_user(user_name)
+ return user.get_ssh_public_key(ssh_public_key_id)
+
+ def get_all_ssh_public_keys(self, user_name):
+ user = self.get_user(user_name)
+ return user.get_all_ssh_public_keys()
+
+ def update_ssh_public_key(self, user_name, ssh_public_key_id, status):
+ user = self.get_user(user_name)
+ return user.update_ssh_public_key(ssh_public_key_id, status)
+
+ def delete_ssh_public_key(self, user_name, ssh_public_key_id):
+ user = self.get_user(user_name)
+ return user.delete_ssh_public_key(ssh_public_key_id)
+
def enable_mfa_device(
self, user_name, serial_number, authentication_code_1, authentication_code_2
):
diff --git a/moto/iam/responses.py b/moto/iam/responses.py
index f3a2f356d..ea14bef0f 100644
--- a/moto/iam/responses.py
+++ b/moto/iam/responses.py
@@ -590,6 +590,46 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name="DeleteAccessKey")
+ def upload_ssh_public_key(self):
+ user_name = self._get_param("UserName")
+ ssh_public_key_body = self._get_param("SSHPublicKeyBody")
+
+ key = iam_backend.upload_ssh_public_key(user_name, ssh_public_key_body)
+ template = self.response_template(UPLOAD_SSH_PUBLIC_KEY_TEMPLATE)
+ return template.render(key=key)
+
+ def get_ssh_public_key(self):
+ user_name = self._get_param("UserName")
+ ssh_public_key_id = self._get_param("SSHPublicKeyId")
+
+ key = iam_backend.get_ssh_public_key(user_name, ssh_public_key_id)
+ template = self.response_template(GET_SSH_PUBLIC_KEY_TEMPLATE)
+ return template.render(key=key)
+
+ def list_ssh_public_keys(self):
+ user_name = self._get_param("UserName")
+
+ keys = iam_backend.get_all_ssh_public_keys(user_name)
+ template = self.response_template(LIST_SSH_PUBLIC_KEYS_TEMPLATE)
+ return template.render(keys=keys)
+
+ def update_ssh_public_key(self):
+ user_name = self._get_param("UserName")
+ ssh_public_key_id = self._get_param("SSHPublicKeyId")
+ status = self._get_param("Status")
+
+ iam_backend.update_ssh_public_key(user_name, ssh_public_key_id, status)
+ template = self.response_template(UPDATE_SSH_PUBLIC_KEY_TEMPLATE)
+ return template.render()
+
+ def delete_ssh_public_key(self):
+ user_name = self._get_param("UserName")
+ ssh_public_key_id = self._get_param("SSHPublicKeyId")
+
+ iam_backend.delete_ssh_public_key(user_name, ssh_public_key_id)
+ template = self.response_template(DELETE_SSH_PUBLIC_KEY_TEMPLATE)
+ return template.render()
+
def deactivate_mfa_device(self):
user_name = self._get_param("UserName")
serial_number = self._get_param("SerialNumber")
@@ -1696,6 +1736,73 @@ GET_ACCESS_KEY_LAST_USED_TEMPLATE = """
"""
+UPLOAD_SSH_PUBLIC_KEY_TEMPLATE = """
+
+
+ {{ key.user_name }}
+ {{ key.ssh_public_key_body }}
+ {{ key.ssh_public_key_id }}
+ {{ key.fingerprint }}
+ {{ key.status }}
+ {{ key.uploaded_iso_8601 }}
+
+
+
+ 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE
+
+"""
+
+GET_SSH_PUBLIC_KEY_TEMPLATE = """
+
+
+ {{ key.user_name }}
+ {{ key.ssh_public_key_body }}
+ {{ key.ssh_public_key_id }}
+ {{ key.fingerprint }}
+ {{ key.status }}
+ {{ key.uploaded_iso_8601 }}
+
+
+
+ 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE
+
+"""
+
+LIST_SSH_PUBLIC_KEYS_TEMPLATE = """
+
+
+ {% for key in keys %}
+
+ {{ key.user_name }}
+ {{ key.ssh_public_key_id }}
+ {{ key.status }}
+ {{ key.uploaded_iso_8601 }}
+
+ {% endfor %}
+
+ false
+
+
+ 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE
+
+"""
+
+UPDATE_SSH_PUBLIC_KEY_TEMPLATE = """
+
+
+
+ 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE
+
+"""
+
+DELETE_SSH_PUBLIC_KEY_TEMPLATE = """
+
+
+
+ 7a62c49f-347e-4fc4-9331-6e8eEXAMPLE
+
+"""
+
CREDENTIAL_REPORT_GENERATING = """
diff --git a/tests/test_iam/test_iam.py b/tests/test_iam/test_iam.py
index 93622b2ec..366ea3620 100644
--- a/tests/test_iam/test_iam.py
+++ b/tests/test_iam/test_iam.py
@@ -1316,6 +1316,122 @@ def test_get_access_key_last_used():
resp["UserName"].should.equal(create_key_response["UserName"])
+@mock_iam
+def test_upload_ssh_public_key():
+ iam = boto3.resource("iam", region_name="us-east-1")
+ client = iam.meta.client
+ username = "test-user"
+ iam.create_user(UserName=username)
+ public_key = MOCK_CERT
+
+ resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
+ pubkey = resp["SSHPublicKey"]
+ pubkey["SSHPublicKeyBody"].should.equal(public_key)
+ pubkey["UserName"].should.equal(username)
+ pubkey["SSHPublicKeyId"].should.have.length_of(20)
+ assert pubkey["SSHPublicKeyId"].startswith("APKA")
+ pubkey.should.have.key("Fingerprint")
+ pubkey["Status"].should.equal("Active")
+ (
+ datetime.utcnow() - pubkey["UploadDate"].replace(tzinfo=None)
+ ).seconds.should.be.within(0, 10)
+
+
+@mock_iam
+def test_get_ssh_public_key():
+ iam = boto3.resource("iam", region_name="us-east-1")
+ client = iam.meta.client
+ username = "test-user"
+ iam.create_user(UserName=username)
+ public_key = MOCK_CERT
+
+ with assert_raises(ClientError):
+ client.get_ssh_public_key(
+ UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Encoding="SSH"
+ )
+
+ resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
+ ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
+
+ resp = client.get_ssh_public_key(
+ UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
+ )
+ resp["SSHPublicKey"]["SSHPublicKeyBody"].should.equal(public_key)
+
+
+@mock_iam
+def test_list_ssh_public_keys():
+ iam = boto3.resource("iam", region_name="us-east-1")
+ client = iam.meta.client
+ username = "test-user"
+ iam.create_user(UserName=username)
+ public_key = MOCK_CERT
+
+ resp = client.list_ssh_public_keys(UserName=username)
+ resp["SSHPublicKeys"].should.have.length_of(0)
+
+ resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
+ ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
+
+ resp = client.list_ssh_public_keys(UserName=username)
+ resp["SSHPublicKeys"].should.have.length_of(1)
+ resp["SSHPublicKeys"][0]["SSHPublicKeyId"].should.equal(ssh_public_key_id)
+
+
+@mock_iam
+def test_update_ssh_public_key():
+ iam = boto3.resource("iam", region_name="us-east-1")
+ client = iam.meta.client
+ username = "test-user"
+ iam.create_user(UserName=username)
+ public_key = MOCK_CERT
+
+ with assert_raises(ClientError):
+ client.update_ssh_public_key(
+ UserName=username, SSHPublicKeyId="xxnon-existent-keyxx", Status="Inactive"
+ )
+
+ resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
+ ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
+ resp["SSHPublicKey"]["Status"].should.equal("Active")
+
+ resp = client.update_ssh_public_key(
+ UserName=username, SSHPublicKeyId=ssh_public_key_id, Status="Inactive"
+ )
+
+ resp = client.get_ssh_public_key(
+ UserName=username, SSHPublicKeyId=ssh_public_key_id, Encoding="SSH"
+ )
+ resp["SSHPublicKey"]["Status"].should.equal("Inactive")
+
+
+@mock_iam
+def test_delete_ssh_public_key():
+ iam = boto3.resource("iam", region_name="us-east-1")
+ client = iam.meta.client
+ username = "test-user"
+ iam.create_user(UserName=username)
+ public_key = MOCK_CERT
+
+ with assert_raises(ClientError):
+ client.delete_ssh_public_key(
+ UserName=username, SSHPublicKeyId="xxnon-existent-keyxx"
+ )
+
+ resp = client.upload_ssh_public_key(UserName=username, SSHPublicKeyBody=public_key)
+ ssh_public_key_id = resp["SSHPublicKey"]["SSHPublicKeyId"]
+
+ resp = client.list_ssh_public_keys(UserName=username)
+ resp["SSHPublicKeys"].should.have.length_of(1)
+
+ resp = client.delete_ssh_public_key(
+ UserName=username, SSHPublicKeyId=ssh_public_key_id
+ )
+
+ resp = client.list_ssh_public_keys(UserName=username)
+ resp["SSHPublicKeys"].should.have.length_of(0)
+
+
@mock_iam
def test_get_account_authorization_details():
test_policy = json.dumps(