cognito-idp – Added format validation and implemented prefix operator for Filter param of list_users (#4388)
This commit is contained in:
		
							parent
							
								
									476fd895b0
								
							
						
					
					
						commit
						c60fb068e1
					
				| @ -2,6 +2,7 @@ from __future__ import unicode_literals | ||||
| 
 | ||||
| import json | ||||
| import os | ||||
| import re | ||||
| 
 | ||||
| from moto.core.responses import BaseResponse | ||||
| from .models import cognitoidp_backends, find_region_by_value, UserStatus | ||||
| @ -332,18 +333,25 @@ class CognitoIdpResponse(BaseResponse): | ||||
|                 "status": lambda u: "Enabled" if u.enabled else "Disabled", | ||||
|                 "username": lambda u: u.username, | ||||
|             } | ||||
|             name, value = filt.replace('"', "").replace(" ", "").split("=") | ||||
|             comparisons = {"=": lambda x, y: x == y, "^=": lambda x, y: x.startswith(y)} | ||||
| 
 | ||||
|             match = re.match(r"([\w:]+)\s*(=|\^=)\s*\"(.*)\"", filt) | ||||
|             if match: | ||||
|                 name, op, value = match.groups() | ||||
|             else: | ||||
|                 raise InvalidParameterException("Error while parsing filter") | ||||
|             compare = comparisons[op] | ||||
|             users = [ | ||||
|                 user | ||||
|                 for user in users | ||||
|                 if [ | ||||
|                     attr | ||||
|                     for attr in user.attributes | ||||
|                     if attr["Name"] == name and attr["Value"] == value | ||||
|                     if attr["Name"] == name and compare(attr["Value"], value) | ||||
|                 ] | ||||
|                 or ( | ||||
|                     name in inherent_attributes | ||||
|                     and inherent_attributes[name](user) == value | ||||
|                     and compare(inherent_attributes[name](user), value) | ||||
|                 ) | ||||
|             ] | ||||
|         response = {"Users": [user.to_json(extended=True) for user in users]} | ||||
|  | ||||
| @ -1209,18 +1209,55 @@ def test_list_users(): | ||||
|         UserAttributes=[{"Name": "phone_number", "Value": "+33666666666"}], | ||||
|     ) | ||||
|     result = conn.list_users( | ||||
|         UserPoolId=user_pool_id, Filter='phone_number="+33666666666' | ||||
|         UserPoolId=user_pool_id, Filter='phone_number="+33666666666"' | ||||
|     ) | ||||
|     result["Users"].should.have.length_of(1) | ||||
|     result["Users"][0]["Username"].should.equal(username_bis) | ||||
| 
 | ||||
|     # checking Filter with space | ||||
|     result = conn.list_users( | ||||
|         UserPoolId=user_pool_id, Filter='phone_number = "+33666666666' | ||||
|         UserPoolId=user_pool_id, Filter='phone_number = "+33666666666"' | ||||
|     ) | ||||
|     result["Users"].should.have.length_of(1) | ||||
|     result["Users"][0]["Username"].should.equal(username_bis) | ||||
| 
 | ||||
|     user0_username = "user0@example.com" | ||||
|     conn.admin_create_user( | ||||
|         UserPoolId=user_pool_id, | ||||
|         Username=user0_username, | ||||
|         UserAttributes=[{"Name": "phone_number", "Value": "+48555555555"}], | ||||
|     ) | ||||
| 
 | ||||
|     # checking Filter with prefix operator | ||||
|     result = conn.list_users(UserPoolId=user_pool_id, Filter='phone_number ^= "+48"') | ||||
|     result["Users"].should.have.length_of(1) | ||||
|     result["Users"][0]["Username"].should.equal(user0_username) | ||||
| 
 | ||||
|     # empty value Filter should also be supported | ||||
|     result = conn.list_users(UserPoolId=user_pool_id, Filter='family_name=""') | ||||
|     result["Users"].should.have.length_of(0) | ||||
| 
 | ||||
| 
 | ||||
| @mock_cognitoidp | ||||
| def test_list_users_incorrect_filter(): | ||||
|     conn = boto3.client("cognito-idp", "us-west-2") | ||||
| 
 | ||||
|     user_pool_id = conn.create_user_pool(PoolName=str(uuid.uuid4()))["UserPool"]["Id"] | ||||
| 
 | ||||
|     with pytest.raises(conn.exceptions.InvalidParameterException) as exc: | ||||
|         conn.list_users(UserPoolId=user_pool_id, Filter="username = foo") | ||||
|     _assert_filter_parsing_error(exc) | ||||
| 
 | ||||
|     with pytest.raises(conn.exceptions.InvalidParameterException) as exc: | ||||
|         conn.list_users(UserPoolId=user_pool_id, Filter="username=") | ||||
|     _assert_filter_parsing_error(exc) | ||||
| 
 | ||||
| 
 | ||||
| def _assert_filter_parsing_error(exc): | ||||
|     err = exc.value.response["Error"] | ||||
|     assert err["Code"].should.equal("InvalidParameterException") | ||||
|     assert err["Message"].should.equal("Error while parsing filter") | ||||
| 
 | ||||
| 
 | ||||
| @mock_cognitoidp | ||||
| def test_list_users_inherent_attributes(): | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user