Added IAM User signing certificate support
This commit is contained in:
parent
6e17ba51c6
commit
94b5438d76
@ -24,3 +24,11 @@ class IAMReportNotPresentException(RESTError):
|
||||
def __init__(self, message):
|
||||
super(IAMReportNotPresentException, self).__init__(
|
||||
"ReportNotPresent", message)
|
||||
|
||||
|
||||
class MalformedCertificate(RESTError):
|
||||
code = 400
|
||||
|
||||
def __init__(self, cert):
|
||||
super(MalformedCertificate, self).__init__(
|
||||
'MalformedCertificate', 'Certificate {cert} is malformed'.format(cert=cert))
|
||||
|
@ -1,14 +1,18 @@
|
||||
from __future__ import unicode_literals
|
||||
import base64
|
||||
import sys
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
import pytz
|
||||
from moto.core import BaseBackend, BaseModel
|
||||
from moto.core.utils import iso_8601_datetime_without_milliseconds
|
||||
|
||||
from .aws_managed_policies import aws_managed_policies_data
|
||||
from .exceptions import IAMNotFoundException, IAMConflictException, IAMReportNotPresentException
|
||||
from .exceptions import IAMNotFoundException, IAMConflictException, IAMReportNotPresentException, MalformedCertificate
|
||||
from .utils import random_access_key, random_alphanumeric, random_resource_id, random_policy_id
|
||||
|
||||
ACCOUNT_ID = 123456789012
|
||||
@ -215,6 +219,16 @@ class Certificate(BaseModel):
|
||||
return "arn:aws:iam::{0}:server-certificate{1}{2}".format(ACCOUNT_ID, self.path, self.cert_name)
|
||||
|
||||
|
||||
class SigningCertificate(BaseModel):
|
||||
|
||||
def __init__(self, id, user_name, body):
|
||||
self.id = id
|
||||
self.user_name = user_name
|
||||
self.body = body
|
||||
self.upload_date = datetime.strftime(datetime.utcnow(), "%Y-%m-%d-%H-%M-%S")
|
||||
self.status = 'Active'
|
||||
|
||||
|
||||
class AccessKey(BaseModel):
|
||||
|
||||
def __init__(self, user_name):
|
||||
@ -299,6 +313,7 @@ class User(BaseModel):
|
||||
self.access_keys = []
|
||||
self.password = None
|
||||
self.password_reset_required = False
|
||||
self.signing_certificates = {}
|
||||
|
||||
@property
|
||||
def arn(self):
|
||||
@ -767,6 +782,48 @@ class IAMBackend(BaseBackend):
|
||||
|
||||
return users
|
||||
|
||||
def upload_signing_certificate(self, user_name, body):
|
||||
user = self.get_user(user_name)
|
||||
cert_id = random_resource_id(size=32)
|
||||
|
||||
# Validate the signing cert:
|
||||
try:
|
||||
if sys.version_info < (3, 0):
|
||||
data = bytes(body)
|
||||
else:
|
||||
data = bytes(body, 'utf8')
|
||||
|
||||
x509.load_pem_x509_certificate(data, default_backend())
|
||||
|
||||
except Exception:
|
||||
raise MalformedCertificate(body)
|
||||
|
||||
user.signing_certificates[cert_id] = SigningCertificate(cert_id, user_name, body)
|
||||
|
||||
return user.signing_certificates[cert_id]
|
||||
|
||||
def delete_signing_certificate(self, user_name, cert_id):
|
||||
user = self.get_user(user_name)
|
||||
|
||||
try:
|
||||
del user.signing_certificates[cert_id]
|
||||
except KeyError:
|
||||
raise IAMNotFoundException("The Certificate with id {id} cannot be found.".format(id=cert_id))
|
||||
|
||||
def list_signing_certificates(self, user_name):
|
||||
user = self.get_user(user_name)
|
||||
|
||||
return list(user.signing_certificates.values())
|
||||
|
||||
def update_signing_certificate(self, user_name, cert_id, status):
|
||||
user = self.get_user(user_name)
|
||||
|
||||
try:
|
||||
user.signing_certificates[cert_id].status = status
|
||||
|
||||
except KeyError:
|
||||
raise IAMNotFoundException("The Certificate with id {id} cannot be found.".format(id=cert_id))
|
||||
|
||||
def create_login_profile(self, user_name, password):
|
||||
# This does not currently deal with PasswordPolicyViolation.
|
||||
user = self.get_user(user_name)
|
||||
|
@ -552,6 +552,38 @@ class IamResponse(BaseResponse):
|
||||
roles=account_details['roles']
|
||||
)
|
||||
|
||||
def upload_signing_certificate(self):
|
||||
user_name = self._get_param('UserName')
|
||||
cert_body = self._get_param('CertificateBody')
|
||||
|
||||
cert = iam_backend.upload_signing_certificate(user_name, cert_body)
|
||||
template = self.response_template(UPLOAD_SIGNING_CERTIFICATE_TEMPLATE)
|
||||
return template.render(cert=cert)
|
||||
|
||||
def update_signing_certificate(self):
|
||||
user_name = self._get_param('UserName')
|
||||
cert_id = self._get_param('CertificateId')
|
||||
status = self._get_param('Status')
|
||||
|
||||
iam_backend.update_signing_certificate(user_name, cert_id, status)
|
||||
template = self.response_template(UPDATE_SIGNING_CERTIFICATE_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def delete_signing_certificate(self):
|
||||
user_name = self._get_param('UserName')
|
||||
cert_id = self._get_param('CertificateId')
|
||||
|
||||
iam_backend.delete_signing_certificate(user_name, cert_id)
|
||||
template = self.response_template(DELETE_SIGNING_CERTIFICATE_TEMPLATE)
|
||||
return template.render()
|
||||
|
||||
def list_signing_certificates(self):
|
||||
user_name = self._get_param('UserName')
|
||||
|
||||
certs = iam_backend.list_signing_certificates(user_name)
|
||||
template = self.response_template(LIST_SIGNING_CERTIFICATES_TEMPLATE)
|
||||
return template.render(user_name=user_name, certificates=certs)
|
||||
|
||||
|
||||
ATTACH_ROLE_POLICY_TEMPLATE = """<AttachRolePolicyResponse>
|
||||
<ResponseMetadata>
|
||||
@ -1485,3 +1517,53 @@ GET_ACCOUNT_AUTHORIZATION_DETAILS_TEMPLATE = """<GetAccountAuthorizationDetailsR
|
||||
<RequestId>92e79ae7-7399-11e4-8c85-4b53eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</GetAccountAuthorizationDetailsResponse>"""
|
||||
|
||||
|
||||
UPLOAD_SIGNING_CERTIFICATE_TEMPLATE = """<UploadSigningCertificateResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
|
||||
<UploadSigningCertificateResult>
|
||||
<Certificate>
|
||||
<UserName>{{ cert.user_name }}</UserName>
|
||||
<CertificateId>{{ cert.id }}</CertificateId>
|
||||
<CertificateBody>{{ cert.body }}</CertificateBody>
|
||||
<Status>{{ cert.status }}</Status>
|
||||
</Certificate>
|
||||
</UploadSigningCertificateResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</UploadSigningCertificateResponse>"""
|
||||
|
||||
|
||||
UPDATE_SIGNING_CERTIFICATE_TEMPLATE = """<UpdateSigningCertificateResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
|
||||
<ResponseMetadata>
|
||||
<RequestId>EXAMPLE8-90ab-cdef-fedc-ba987EXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</UpdateSigningCertificateResponse>"""
|
||||
|
||||
|
||||
DELETE_SIGNING_CERTIFICATE_TEMPLATE = """<DeleteSigningCertificateResponse xmlns="https://iam.amazonaws.com/doc/2010-05-08/">
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</DeleteSigningCertificateResponse>"""
|
||||
|
||||
|
||||
LIST_SIGNING_CERTIFICATES_TEMPLATE = """<ListSigningCertificatesResponse>
|
||||
<ListSigningCertificatesResult>
|
||||
<UserName>{{ user_name }}</UserName>
|
||||
<Certificates>
|
||||
{% for cert in certificates %}
|
||||
<member>
|
||||
<UserName>{{ user_name }}</UserName>
|
||||
<CertificateId>{{ cert.id }}</CertificateId>
|
||||
<CertificateBody>{{ cert.body }}</CertificateBody>
|
||||
<Status>{{ cert.status }}</Status>
|
||||
</member>
|
||||
{% endfor %}
|
||||
</Certificates>
|
||||
<IsTruncated>false</IsTruncated>
|
||||
</ListSigningCertificatesResult>
|
||||
<ResponseMetadata>
|
||||
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
|
||||
</ResponseMetadata>
|
||||
</ListSigningCertificatesResponse>"""
|
||||
|
@ -12,8 +12,7 @@ def random_alphanumeric(length):
|
||||
)
|
||||
|
||||
|
||||
def random_resource_id():
|
||||
size = 20
|
||||
def random_resource_id(size=20):
|
||||
chars = list(range(10)) + list(string.ascii_lowercase)
|
||||
|
||||
return ''.join(six.text_type(random.choice(chars)) for x in range(size))
|
||||
|
@ -14,6 +14,19 @@ from nose.tools import raises
|
||||
from tests.helpers import requires_boto_gte
|
||||
|
||||
|
||||
MOCK_CERT = """-----BEGIN CERTIFICATE-----
|
||||
MIIBpzCCARACCQCY5yOdxCTrGjANBgkqhkiG9w0BAQsFADAXMRUwEwYDVQQKDAxt
|
||||
b3RvIHRlc3RpbmcwIBcNMTgxMTA1MTkwNTIwWhgPMjI5MjA4MTkxOTA1MjBaMBcx
|
||||
FTATBgNVBAoMDG1vdG8gdGVzdGluZzCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
|
||||
gYEA1Jn3g2h7LD3FLqdpcYNbFXCS4V4eDpuTCje9vKFcC3pi/01147X3zdfPy8Mt
|
||||
ZhKxcREOwm4NXykh23P9KW7fBovpNwnbYsbPqj8Hf1ZaClrgku1arTVhEnKjx8zO
|
||||
vaR/bVLCss4uE0E0VM1tJn/QGQsfthFsjuHtwx8uIWz35tUCAwEAATANBgkqhkiG
|
||||
9w0BAQsFAAOBgQBWdOQ7bDc2nWkUhFjZoNIZrqjyNdjlMUndpwREVD7FQ/DuxJMj
|
||||
FyDHrtlrS80dPUQWNYHw++oACDpWO01LGLPPrGmuO/7cOdojPEd852q5gd+7W9xt
|
||||
8vUH+pBa6IBLbvBp+szli51V3TLSWcoyy4ceJNQU2vCkTLoFdS0RLd/7tQ==
|
||||
-----END CERTIFICATE-----"""
|
||||
|
||||
|
||||
@mock_iam_deprecated()
|
||||
def test_get_all_server_certs():
|
||||
conn = boto.connect_iam()
|
||||
@ -763,3 +776,66 @@ def test_get_account_authorization_details():
|
||||
assert len(result['UserDetailList']) == 1
|
||||
assert len(result['GroupDetailList']) == 1
|
||||
assert len(result['Policies']) > 1
|
||||
|
||||
|
||||
@mock_iam
|
||||
def test_signing_certs():
|
||||
client = boto3.client('iam', region_name='us-east-1')
|
||||
|
||||
# Create the IAM user first:
|
||||
client.create_user(UserName='testing')
|
||||
|
||||
# Upload the cert:
|
||||
resp = client.upload_signing_certificate(UserName='testing', CertificateBody=MOCK_CERT)['Certificate']
|
||||
cert_id = resp['CertificateId']
|
||||
|
||||
assert resp['UserName'] == 'testing'
|
||||
assert resp['Status'] == 'Active'
|
||||
assert resp['CertificateBody'] == MOCK_CERT
|
||||
assert resp['CertificateId']
|
||||
|
||||
# Upload a the cert with an invalid body:
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.upload_signing_certificate(UserName='testing', CertificateBody='notacert')
|
||||
assert ce.exception.response['Error']['Code'] == 'MalformedCertificate'
|
||||
|
||||
# Upload with an invalid user:
|
||||
with assert_raises(ClientError):
|
||||
client.upload_signing_certificate(UserName='notauser', CertificateBody=MOCK_CERT)
|
||||
|
||||
# Update:
|
||||
client.update_signing_certificate(UserName='testing', CertificateId=cert_id, Status='Inactive')
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.update_signing_certificate(UserName='notauser', CertificateId=cert_id, Status='Inactive')
|
||||
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.update_signing_certificate(UserName='testing', CertificateId='x' * 32, Status='Inactive')
|
||||
|
||||
assert ce.exception.response['Error']['Message'] == 'The Certificate with id {id} cannot be found.'.format(
|
||||
id='x' * 32)
|
||||
|
||||
# List the certs:
|
||||
resp = client.list_signing_certificates(UserName='testing')['Certificates']
|
||||
assert len(resp) == 1
|
||||
assert resp[0]['CertificateBody'] == MOCK_CERT
|
||||
assert resp[0]['Status'] == 'Inactive' # Changed with the update call above.
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.list_signing_certificates(UserName='notauser')
|
||||
|
||||
# Delete:
|
||||
client.delete_signing_certificate(UserName='testing', CertificateId=cert_id)
|
||||
|
||||
with assert_raises(ClientError):
|
||||
client.delete_signing_certificate(UserName='notauser', CertificateId=cert_id)
|
||||
|
||||
with assert_raises(ClientError) as ce:
|
||||
client.delete_signing_certificate(UserName='testing', CertificateId=cert_id)
|
||||
|
||||
assert ce.exception.response['Error']['Message'] == 'The Certificate with id {id} cannot be found.'.format(
|
||||
id=cert_id)
|
||||
|
||||
# Verify that it's not in the list:
|
||||
resp = client.list_signing_certificates(UserName='testing')
|
||||
assert not resp['Certificates']
|
||||
|
Loading…
Reference in New Issue
Block a user