Merge pull request #1234 from dbfr3qs/master
add basic awslambda get_policy
This commit is contained in:
		
						commit
						40fdbd4907
					
				| @ -132,6 +132,7 @@ class LambdaFunction(BaseModel): | ||||
|         self.logs_backend = logs_backends[self.region] | ||||
|         self.environment_vars = spec.get('Environment', {}).get('Variables', {}) | ||||
|         self.docker_client = docker.from_env() | ||||
|         self.policy = "" | ||||
| 
 | ||||
|         # Unfortunately mocking replaces this method w/o fallback enabled, so we | ||||
|         # need to replace it if we detect it's been mocked | ||||
| @ -527,6 +528,9 @@ class LambdaBackend(BaseBackend): | ||||
|                 pass | ||||
|                 # Don't care | ||||
| 
 | ||||
|     def add_policy(self, function_name, policy): | ||||
|         self.get_function(function_name).policy = policy | ||||
| 
 | ||||
| 
 | ||||
| def do_validate_s3(): | ||||
|     return os.environ.get('VALIDATE_LAMBDA_S3', '') in ['', '1', 'true'] | ||||
|  | ||||
| @ -57,6 +57,35 @@ class LambdaResponse(BaseResponse): | ||||
|         else: | ||||
|             raise ValueError("Cannot handle {0} request".format(request.method)) | ||||
| 
 | ||||
|     def policy(self, request, full_url, headers): | ||||
|         if request.method == 'GET': | ||||
|             return self._get_policy(request, full_url, headers) | ||||
|         if request.method == 'POST': | ||||
|             return self._add_policy(request, full_url, headers) | ||||
| 
 | ||||
|     def _add_policy(self, request, full_url, headers): | ||||
|         lambda_backend = self.get_lambda_backend(full_url) | ||||
| 
 | ||||
|         path = request.path if hasattr(request, 'path') else request.path_url | ||||
|         function_name = path.split('/')[-2] | ||||
|         if lambda_backend.has_function(function_name): | ||||
|             policy = request.body.decode('utf8') | ||||
|             lambda_backend.add_policy(function_name, policy) | ||||
|             return 200, {}, json.dumps(dict(Statement=policy)) | ||||
|         else: | ||||
|             return 404, {}, "{}" | ||||
| 
 | ||||
|     def _get_policy(self, request, full_url, headers): | ||||
|         lambda_backend = self.get_lambda_backend(full_url) | ||||
| 
 | ||||
|         path = request.path if hasattr(request, 'path') else request.path_url | ||||
|         function_name = path.split('/')[-2] | ||||
|         if lambda_backend.has_function(function_name): | ||||
|             function = lambda_backend.get_function(function_name) | ||||
|             return 200, {}, json.dumps(dict(Policy="{\"Statement\":[" + function.policy + "]}")) | ||||
|         else: | ||||
|             return 404, {}, "{}" | ||||
| 
 | ||||
|     def _invoke(self, request, full_url): | ||||
|         response_headers = {} | ||||
|         lambda_backend = self.get_lambda_backend(full_url) | ||||
|  | ||||
| @ -12,5 +12,6 @@ url_paths = { | ||||
|     r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/?$': response.function, | ||||
|     r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invocations/?$': response.invoke, | ||||
|     r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/invoke-async/?$': response.invoke_async, | ||||
|     r'{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)': response.tag | ||||
|     r'{0}/(?P<api_version>[^/]+)/tags/(?P<resource_arn>.+)': response.tag, | ||||
|     r'{0}/(?P<api_version>[^/]+)/functions/(?P<function_name>[\w_-]+)/policy/?$': response.policy | ||||
| } | ||||
|  | ||||
| @ -645,3 +645,74 @@ def test_get_function_created_with_zipfile(): | ||||
|             } | ||||
|         }, | ||||
|     ) | ||||
| 
 | ||||
| @mock_lambda | ||||
| def add_function_permission(): | ||||
|     conn = boto3.client('lambda', 'us-west-2') | ||||
|     zip_content = get_test_zip_file1() | ||||
|     result = conn.create_function( | ||||
|         FunctionName='testFunction', | ||||
|         Runtime='python2.7', | ||||
|         Role='test-iam-role', | ||||
|         Handler='lambda_function.handler', | ||||
|         Code={ | ||||
|             'ZipFile': zip_content, | ||||
|         }, | ||||
|         Description='test lambda function', | ||||
|         Timeout=3, | ||||
|         MemorySize=128, | ||||
|         Publish=True, | ||||
|     ) | ||||
| 
 | ||||
|     response = conn.add_permission( | ||||
|         FunctionName='testFunction', | ||||
|         StatementId='1', | ||||
|         Action="lambda:InvokeFunction", | ||||
|         Principal='432143214321', | ||||
|         SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld", | ||||
|         SourceAccount='123412341234', | ||||
|         EventSourceToken='blah', | ||||
|         Qualifier='2' | ||||
|     ) | ||||
|     assert 'Statement' in response | ||||
|     res = json.loads(response['Statement']) | ||||
|     assert res['Action'] == "lambda:InvokeFunction" | ||||
| 
 | ||||
| 
 | ||||
| @mock_lambda | ||||
| def get_function_policy(): | ||||
|     conn = boto3.client('lambda', 'us-west-2') | ||||
|     zip_content = get_test_zip_file1() | ||||
|     result = conn.create_function( | ||||
|         FunctionName='testFunction', | ||||
|         Runtime='python2.7', | ||||
|         Role='test-iam-role', | ||||
|         Handler='lambda_function.handler', | ||||
|         Code={ | ||||
|             'ZipFile': zip_content, | ||||
|         }, | ||||
|         Description='test lambda function', | ||||
|         Timeout=3, | ||||
|         MemorySize=128, | ||||
|         Publish=True, | ||||
|     ) | ||||
| 
 | ||||
|     response = conn.add_permission( | ||||
|         FunctionName='testFunction', | ||||
|         StatementId='1', | ||||
|         Action="lambda:InvokeFunction", | ||||
|         Principal='432143214321', | ||||
|         SourceArn="arn:aws:lambda:us-west-2:account-id:function:helloworld", | ||||
|         SourceAccount='123412341234', | ||||
|         EventSourceToken='blah', | ||||
|         Qualifier='2' | ||||
|     ) | ||||
| 
 | ||||
|     response = conn.get_policy( | ||||
|         FunctionName='testFunction' | ||||
|     ) | ||||
| 
 | ||||
|     assert 'Policy' in response | ||||
|     assert isinstance(response['Policy'], str) | ||||
|     res = json.loads(response['Policy']) | ||||
|     assert res['Statement'][0]['Action'] == 'lambda:InvokeFunction' | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user