CognitoIDP: clean MFA settings capability and enable multiple MFA methods (#6853)
This commit is contained in:
		
							parent
							
								
									178f2b8c03
								
							
						
					
					
						commit
						9a84423187
					
				| @ -833,7 +833,7 @@ class CognitoIdpUser(BaseModel): | |||||||
|         user_mfa_setting_list = [] |         user_mfa_setting_list = [] | ||||||
|         if self.software_token_mfa_enabled: |         if self.software_token_mfa_enabled: | ||||||
|             user_mfa_setting_list.append("SOFTWARE_TOKEN_MFA") |             user_mfa_setting_list.append("SOFTWARE_TOKEN_MFA") | ||||||
|         elif self.sms_mfa_enabled: |         if self.sms_mfa_enabled: | ||||||
|             user_mfa_setting_list.append("SMS_MFA") |             user_mfa_setting_list.append("SMS_MFA") | ||||||
|         user_json = self._base_json() |         user_json = self._base_json() | ||||||
|         if extended: |         if extended: | ||||||
| @ -1981,26 +1981,13 @@ class CognitoIdpBackend(BaseBackend): | |||||||
|         for user_pool in self.user_pools.values(): |         for user_pool in self.user_pools.values(): | ||||||
|             if access_token in user_pool.access_tokens: |             if access_token in user_pool.access_tokens: | ||||||
|                 _, username = user_pool.access_tokens[access_token] |                 _, username = user_pool.access_tokens[access_token] | ||||||
|                 user = self.admin_get_user(user_pool.id, username) |  | ||||||
| 
 | 
 | ||||||
|                 if software_token_mfa_settings and software_token_mfa_settings.get( |                 return self.admin_set_user_mfa_preference( | ||||||
|                     "Enabled" |                     user_pool.id, | ||||||
|                 ): |                     username, | ||||||
|                     if user.token_verified: |                     software_token_mfa_settings, | ||||||
|                         user.software_token_mfa_enabled = True |                     sms_mfa_settings, | ||||||
|                     else: |                 ) | ||||||
|                         raise InvalidParameterException( |  | ||||||
|                             "User has not verified software token mfa" |  | ||||||
|                         ) |  | ||||||
| 
 |  | ||||||
|                     if software_token_mfa_settings.get("PreferredMfa"): |  | ||||||
|                         user.preferred_mfa_setting = "SOFTWARE_TOKEN_MFA" |  | ||||||
|                 elif sms_mfa_settings and sms_mfa_settings["Enabled"]: |  | ||||||
|                     user.sms_mfa_enabled = True |  | ||||||
| 
 |  | ||||||
|                     if sms_mfa_settings.get("PreferredMfa"): |  | ||||||
|                         user.preferred_mfa_setting = "SMS_MFA" |  | ||||||
|                 return None |  | ||||||
| 
 | 
 | ||||||
|         raise NotAuthorizedError(access_token) |         raise NotAuthorizedError(access_token) | ||||||
| 
 | 
 | ||||||
| @ -2013,21 +2000,33 @@ class CognitoIdpBackend(BaseBackend): | |||||||
|     ) -> None: |     ) -> None: | ||||||
|         user = self.admin_get_user(user_pool_id, username) |         user = self.admin_get_user(user_pool_id, username) | ||||||
| 
 | 
 | ||||||
|         if software_token_mfa_settings and software_token_mfa_settings.get("Enabled"): |         if software_token_mfa_settings: | ||||||
|             if user.token_verified: |             if software_token_mfa_settings.get("Enabled"): | ||||||
|                 user.software_token_mfa_enabled = True |                 if user.token_verified: | ||||||
|  |                     user.software_token_mfa_enabled = True | ||||||
|  |                 else: | ||||||
|  |                     raise InvalidParameterException( | ||||||
|  |                         "User has not verified software token mfa" | ||||||
|  |                     ) | ||||||
|             else: |             else: | ||||||
|                 raise InvalidParameterException( |                 user.software_token_mfa_enabled = False | ||||||
|                     "User has not verified software token mfa" |  | ||||||
|                 ) |  | ||||||
| 
 | 
 | ||||||
|             if software_token_mfa_settings.get("PreferredMfa"): |             if software_token_mfa_settings.get("PreferredMfa"): | ||||||
|                 user.preferred_mfa_setting = "SOFTWARE_TOKEN_MFA" |                 user.preferred_mfa_setting = "SOFTWARE_TOKEN_MFA" | ||||||
|         elif sms_mfa_settings and sms_mfa_settings.get("Enabled"): |             elif user.preferred_mfa_setting != "SMS_MFA": | ||||||
|             user.sms_mfa_enabled = True |                 user.preferred_mfa_setting = "" | ||||||
|  | 
 | ||||||
|  |         if sms_mfa_settings: | ||||||
|  |             if sms_mfa_settings.get("Enabled"): | ||||||
|  |                 user.sms_mfa_enabled = True | ||||||
|  |             else: | ||||||
|  |                 user.sms_mfa_enabled = False | ||||||
| 
 | 
 | ||||||
|             if sms_mfa_settings.get("PreferredMfa"): |             if sms_mfa_settings.get("PreferredMfa"): | ||||||
|                 user.preferred_mfa_setting = "SMS_MFA" |                 user.preferred_mfa_setting = "SMS_MFA" | ||||||
|  |             elif user.preferred_mfa_setting != "SOFTWARE_TOKEN_MFA": | ||||||
|  |                 user.preferred_mfa_setting = "" | ||||||
|  | 
 | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|     def _validate_password(self, user_pool_id: str, password: str) -> None: |     def _validate_password(self, user_pool_id: str, password: str) -> None: | ||||||
|  | |||||||
| @ -4235,6 +4235,8 @@ def test_setting_mfa(): | |||||||
| 
 | 
 | ||||||
|     for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: |     for auth_flow in ["ADMIN_NO_SRP_AUTH", "ADMIN_USER_PASSWORD_AUTH"]: | ||||||
|         result = authentication_flow(conn, auth_flow) |         result = authentication_flow(conn, auth_flow) | ||||||
|  | 
 | ||||||
|  |         # Set MFA method | ||||||
|         conn.associate_software_token(AccessToken=result["access_token"]) |         conn.associate_software_token(AccessToken=result["access_token"]) | ||||||
|         conn.verify_software_token( |         conn.verify_software_token( | ||||||
|             AccessToken=result["access_token"], UserCode="123456" |             AccessToken=result["access_token"], UserCode="123456" | ||||||
| @ -4243,12 +4245,24 @@ def test_setting_mfa(): | |||||||
|             AccessToken=result["access_token"], |             AccessToken=result["access_token"], | ||||||
|             SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, |             SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, | ||||||
|         ) |         ) | ||||||
|         result = conn.admin_get_user( |         user = conn.admin_get_user( | ||||||
|             UserPoolId=result["user_pool_id"], Username=result["username"] |             UserPoolId=result["user_pool_id"], Username=result["username"] | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         assert len(result["UserMFASettingList"]) == 1 |         assert len(user["UserMFASettingList"]) == 1 | ||||||
|         assert result["PreferredMfaSetting"] == "SOFTWARE_TOKEN_MFA" |         assert user["PreferredMfaSetting"] == "SOFTWARE_TOKEN_MFA" | ||||||
|  | 
 | ||||||
|  |         # Unset MFA method | ||||||
|  |         conn.set_user_mfa_preference( | ||||||
|  |             AccessToken=result["access_token"], | ||||||
|  |             SoftwareTokenMfaSettings={"Enabled": False, "PreferredMfa": False}, | ||||||
|  |         ) | ||||||
|  |         user = conn.admin_get_user( | ||||||
|  |             UserPoolId=result["user_pool_id"], Username=result["username"] | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         assert len(user["UserMFASettingList"]) == 0 | ||||||
|  |         assert user["PreferredMfaSetting"] == "" | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @mock_cognitoidp | @mock_cognitoidp | ||||||
| @ -4269,7 +4283,7 @@ def test_setting_mfa_when_token_not_verified(): | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @mock_cognitoidp | @mock_cognitoidp | ||||||
| def test_admin_setting_mfa(): | def test_admin_setting_single_mfa(): | ||||||
|     conn = boto3.client("cognito-idp", "us-west-2") |     conn = boto3.client("cognito-idp", "us-west-2") | ||||||
| 
 | 
 | ||||||
|     user_pool_id = conn.create_user_pool( |     user_pool_id = conn.create_user_pool( | ||||||
| @ -4278,6 +4292,7 @@ def test_admin_setting_mfa(): | |||||||
|     username = "test@example.com" |     username = "test@example.com" | ||||||
|     conn.admin_create_user(UserPoolId=user_pool_id, Username=username) |     conn.admin_create_user(UserPoolId=user_pool_id, Username=username) | ||||||
| 
 | 
 | ||||||
|  |     # Set MFA SMS method | ||||||
|     conn.admin_set_user_mfa_preference( |     conn.admin_set_user_mfa_preference( | ||||||
|         Username=username, |         Username=username, | ||||||
|         UserPoolId=user_pool_id, |         UserPoolId=user_pool_id, | ||||||
| @ -4287,6 +4302,50 @@ def test_admin_setting_mfa(): | |||||||
|     assert len(result["UserMFASettingList"]) == 1 |     assert len(result["UserMFASettingList"]) == 1 | ||||||
|     assert result["PreferredMfaSetting"] == "SMS_MFA" |     assert result["PreferredMfaSetting"] == "SMS_MFA" | ||||||
| 
 | 
 | ||||||
|  |     # Unset MFA SMS method | ||||||
|  |     conn.admin_set_user_mfa_preference( | ||||||
|  |         Username=username, | ||||||
|  |         UserPoolId=user_pool_id, | ||||||
|  |         SMSMfaSettings={"Enabled": False, "PreferredMfa": False}, | ||||||
|  |     ) | ||||||
|  |     result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) | ||||||
|  |     assert len(result["UserMFASettingList"]) == 0 | ||||||
|  |     assert result["PreferredMfaSetting"] == "" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @mock_cognitoidp | ||||||
|  | def test_admin_setting_mfa_totp_and_sms(): | ||||||
|  |     conn = boto3.client("cognito-idp", "us-west-2") | ||||||
|  | 
 | ||||||
|  |     result = authentication_flow(conn, "ADMIN_NO_SRP_AUTH") | ||||||
|  |     access_token = result["access_token"] | ||||||
|  |     user_pool_id = result["user_pool_id"] | ||||||
|  |     username = result["username"] | ||||||
|  |     conn.associate_software_token(AccessToken=access_token) | ||||||
|  |     conn.verify_software_token(AccessToken=access_token, UserCode="123456") | ||||||
|  | 
 | ||||||
|  |     # Set MFA TOTP and SMS methods | ||||||
|  |     conn.admin_set_user_mfa_preference( | ||||||
|  |         Username=username, | ||||||
|  |         UserPoolId=user_pool_id, | ||||||
|  |         SoftwareTokenMfaSettings={"Enabled": True, "PreferredMfa": True}, | ||||||
|  |         SMSMfaSettings={"Enabled": True, "PreferredMfa": False}, | ||||||
|  |     ) | ||||||
|  |     result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) | ||||||
|  |     assert len(result["UserMFASettingList"]) == 2 | ||||||
|  |     assert result["PreferredMfaSetting"] == "SOFTWARE_TOKEN_MFA" | ||||||
|  | 
 | ||||||
|  |     # Unset MFA TOTP and SMS methods | ||||||
|  |     conn.admin_set_user_mfa_preference( | ||||||
|  |         Username=username, | ||||||
|  |         UserPoolId=user_pool_id, | ||||||
|  |         SoftwareTokenMfaSettings={"Enabled": False, "PreferredMfa": False}, | ||||||
|  |         SMSMfaSettings={"Enabled": False, "PreferredMfa": False}, | ||||||
|  |     ) | ||||||
|  |     result = conn.admin_get_user(UserPoolId=user_pool_id, Username=username) | ||||||
|  |     assert len(result["UserMFASettingList"]) == 0 | ||||||
|  |     assert result["PreferredMfaSetting"] == "" | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| @mock_cognitoidp | @mock_cognitoidp | ||||||
| def test_admin_setting_mfa_when_token_not_verified(): | def test_admin_setting_mfa_when_token_not_verified(): | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user