Merge pull request #295 from mfulleratlassian/adding_iam_credentials_report

Adding iam credentials report
This commit is contained in:
Steve Pulec 2015-02-05 08:36:38 -05:00
commit 8a8aba3395
3 changed files with 121 additions and 3 deletions

View File

@ -4,7 +4,7 @@ from boto.exception import BotoServerError
from moto.core import BaseBackend
from .utils import random_access_key, random_alphanumeric, random_resource_id
from datetime import datetime
import base64
class Role(object):
@ -135,7 +135,7 @@ class User(object):
datetime.utcnow(),
"%Y-%m-%d-%H-%M-%S"
)
self.arn = 'arn:aws:iam::123456789012:user/{0}'.format(name)
self.policies = {}
self.access_keys = []
self.password = None
@ -184,6 +184,45 @@ class User(object):
raise NotImplementedError('"Fn::GetAtt" : [ "{0}" , "Arn" ]"')
raise UnformattedGetAttTemplateException()
def to_csv(self):
date_format = '%Y-%m-%dT%H:%M:%S+00:00'
date_created = datetime.strptime(self.created, '%Y-%m-%d-%H-%M-%S')
# aagrawal,arn:aws:iam::509284790694:user/aagrawal,2014-09-01T22:28:48+00:00,true,2014-11-12T23:36:49+00:00,2014-09-03T18:59:00+00:00,N/A,false,true,2014-09-01T22:28:48+00:00,false,N/A,false,N/A,false,N/A
if not self.password:
password_enabled = 'false'
password_last_used = 'not_supported'
else:
password_enabled = 'true'
password_last_used = 'no_information'
if len(self.access_keys) == 0:
access_key_1_active = 'false'
access_key_1_last_rotated = 'N/A'
access_key_2_active = 'false'
access_key_2_last_rotated = 'N/A'
elif len(self.access_keys) == 1:
access_key_1_active = 'true'
access_key_1_last_rotated = date_created.strftime(date_format)
access_key_2_active = 'false'
access_key_2_last_rotated = 'N/A'
else:
access_key_1_active = 'true'
access_key_1_last_rotated = date_created.strftime(date_format)
access_key_2_active = 'true'
access_key_2_last_rotated = date_created.strftime(date_format)
return '{0},{1},{2},{3},{4},{5},not_supported,false,{6},{7},{8},{9},false,N/A,false,N/A'.format(self.name,
self.arn,
date_created.strftime(date_format),
password_enabled,
password_last_used,
date_created.strftime(date_format),
access_key_1_active,
access_key_1_last_rotated,
access_key_2_active,
access_key_2_last_rotated
)
class IAMBackend(BaseBackend):
@ -193,6 +232,7 @@ class IAMBackend(BaseBackend):
self.certificates = {}
self.groups = {}
self.users = {}
self.credential_report = None
super(IAMBackend, self).__init__()
def create_role(self, role_name, assume_role_policy_document, path):
@ -394,5 +434,18 @@ class IAMBackend(BaseBackend):
except KeyError:
raise BotoServerError(404, 'Not Found')
def report_generated(self):
return self.credential_report
def generate_report(self):
self.credential_report = True
def get_credential_report(self):
if not self.credential_report:
raise BotoServerError(410, 'ReportNotPresent')
report = 'user,arn,user_creation_time,password_enabled,password_last_used,password_last_changed,password_next_rotation,mfa_active,access_key_1_active,access_key_1_last_rotated,access_key_2_active,access_key_2_last_rotated,cert_1_active,cert_1_last_rotated,cert_2_active,cert_2_last_rotated\n'
for user in self.users:
report += self.users[user].to_csv()
return base64.b64encode(report.encode('ascii')).decode('ascii')
iam_backend = IAMBackend()

View File

@ -219,6 +219,18 @@ class IamResponse(BaseResponse):
template = self.response_template(GENERIC_EMPTY_TEMPLATE)
return template.render(name='DeleteUser')
def generate_credential_report(self):
if iam_backend.report_generated():
template = self.response_template(CREDENTIAL_REPORT_GENERATED)
else:
template = self.response_template(CREDENTIAL_REPORT_GENERATING)
iam_backend.generate_report()
return template.render()
def get_credential_report(self):
report = iam_backend.get_credential_report()
template = self.response_template(CREDENTIAL_REPORT)
return template.render(report=report)
GENERIC_EMPTY_TEMPLATE = """<{{ name }}Response>
<ResponseMetadata>
@ -559,3 +571,34 @@ LIST_ACCESS_KEYS_TEMPLATE = """<ListAccessKeysResponse>
<RequestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</RequestId>
</ResponseMetadata>
</ListAccessKeysResponse>"""
CREDENTIAL_REPORT_GENERATING = """
<GenerateCredentialReportResponse>
<GenerateCredentialReportResult>
<state>STARTED</state>
<description>No report exists. Starting a new report generation task</description>
</GenerateCredentialReportResult>
<ResponseMetadata>
<RequestId>fa788a82-aa8a-11e4-a278-1786c418872b"</RequestId>
</ResponseMetadata>
</GenerateCredentialReportResponse>"""
CREDENTIAL_REPORT_GENERATED = """<GenerateCredentialReportResponse>
<GenerateCredentialReportResult>
<state>COMPLETE</state>
</GenerateCredentialReportResult>
<ResponseMetadata>
<RequestId>fa788a82-aa8a-11e4-a278-1786c418872b"</RequestId>
</ResponseMetadata>
</GenerateCredentialReportResponse>"""
CREDENTIAL_REPORT = """<GetCredentialReportResponse>
<GetCredentialReportResult>
<content>{{ report }}</content>
<GeneratedTime>2015-02-02T20:02:02Z</GeneratedTime>
<ReportFormat>text/csv</ReportFormat>
</GetCredentialReportResult>
<ResponseMetadata>
<RequestId>fa788a82-aa8a-11e4-a278-1786c418872b"</RequestId>
</ResponseMetadata>
</GetCredentialReportResponse>"""

View File

@ -1,10 +1,11 @@
from __future__ import unicode_literals
import boto
import sure # noqa
import re
from nose.tools import assert_raises, assert_equals, assert_not_equals
from boto.exception import BotoServerError
import base64
from moto import mock_iam
@ -200,3 +201,24 @@ def test_delete_user():
conn.delete_user('my-user')
conn.create_user('my-user')
conn.delete_user('my-user')
@mock_iam()
def test_generate_credential_report():
conn = boto.connect_iam()
result = conn.generate_credential_report()
result['generate_credential_report_response']['generate_credential_report_result']['state'].should.equal('STARTED')
result = conn.generate_credential_report()
result['generate_credential_report_response']['generate_credential_report_result']['state'].should.equal('COMPLETE')
@mock_iam()
def test_get_credential_report():
conn = boto.connect_iam()
conn.create_user('my-user')
with assert_raises(BotoServerError):
conn.get_credential_report()
result = conn.generate_credential_report()
while result['generate_credential_report_response']['generate_credential_report_result']['state'] != 'COMPLETE':
result = conn.generate_credential_report()
result = conn.get_credential_report()
report = base64.b64decode(result['get_credential_report_response']['get_credential_report_result']['content'].encode('ascii')).decode('ascii')
report.should.match(r'.*my-user.*')