From f1ba6554b297d3d7dee1edc0f9d8f216120ab7f3 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Sun, 11 Jun 2023 18:44:30 +0000 Subject: [PATCH] Techdebt: Replace sure with regular asserts in AWSLambda tests (#6393) --- tests/test_appsync/test_appsync.py | 4 +- .../test_awslambda_cloudformation.py | 54 +- tests/test_awslambda/test_lambda.py | 475 +++++++++--------- tests/test_awslambda/test_lambda_alias.py | 113 +++-- .../test_awslambda/test_lambda_concurrency.py | 7 +- .../test_lambda_eventsourcemapping.py | 15 +- .../test_lambda_function_urls.py | 32 +- tests/test_awslambda/test_lambda_invoke.py | 91 ++-- tests/test_awslambda/test_lambda_layers.py | 73 ++- tests/test_awslambda/test_lambda_policy.py | 41 +- tests/test_awslambda/test_lambda_tags.py | 68 ++- tests/test_awslambda/test_policy.py | 5 +- 12 files changed, 490 insertions(+), 488 deletions(-) diff --git a/tests/test_appsync/test_appsync.py b/tests/test_appsync/test_appsync.py index b588900ea..97bcba9dc 100644 --- a/tests/test_appsync/test_appsync.py +++ b/tests/test_appsync/test_appsync.py @@ -25,8 +25,8 @@ def test_create_graphql_api(): ) assert api["uris"] == {"GRAPHQL": "http://graphql.uri"} assert api["xrayEnabled"] is False - api.shouldnt.have.key("additionalAuthenticationProviders") - api.shouldnt.have.key("logConfig") + assert "additionalAuthenticationProviders" not in api + assert "logConfig" not in api @mock_appsync diff --git a/tests/test_awslambda/test_awslambda_cloudformation.py b/tests/test_awslambda/test_awslambda_cloudformation.py index 0aed87a78..3ef4bea4d 100644 --- a/tests/test_awslambda/test_awslambda_cloudformation.py +++ b/tests/test_awslambda/test_awslambda_cloudformation.py @@ -1,6 +1,5 @@ import boto3 import io -import sure # noqa # pylint: disable=unused-import import zipfile from botocore.exceptions import ClientError from moto import mock_cloudformation, mock_iam, mock_lambda, mock_s3, mock_sqs @@ -83,24 +82,21 @@ def test_lambda_can_be_updated_by_cloudformation(): created_fn_name = get_created_function_name(cf, stack) # Verify function has been created created_fn = lmbda.get_function(FunctionName=created_fn_name) - created_fn["Configuration"]["Handler"].should.equal( - "lambda_function.lambda_handler1" - ) - created_fn["Configuration"]["Runtime"].should.equal("python3.7") - created_fn["Code"]["Location"].should.match("/test1.zip") + assert created_fn["Configuration"]["Handler"] == "lambda_function.lambda_handler1" + assert created_fn["Configuration"]["Runtime"] == "python3.7" + assert "/test1.zip" in created_fn["Code"]["Location"] # Update CF stack cf.update_stack(StackName=stack_name, TemplateBody=body2) updated_fn_name = get_created_function_name(cf, stack) # Verify function has been updated updated_fn = lmbda.get_function(FunctionName=updated_fn_name) - updated_fn["Configuration"]["FunctionArn"].should.equal( - created_fn["Configuration"]["FunctionArn"] + assert ( + updated_fn["Configuration"]["FunctionArn"] + == created_fn["Configuration"]["FunctionArn"] ) - updated_fn["Configuration"]["Handler"].should.equal( - "lambda_function.lambda_handler2" - ) - updated_fn["Configuration"]["Runtime"].should.equal("python3.8") - updated_fn["Code"]["Location"].should.match("/test2.zip") + assert updated_fn["Configuration"]["Handler"] == "lambda_function.lambda_handler2" + assert updated_fn["Configuration"]["Runtime"] == "python3.8" + assert "/test2.zip" in updated_fn["Code"]["Location"] @mock_cloudformation @@ -117,7 +113,7 @@ def test_lambda_can_be_deleted_by_cloudformation(): # Verify function was deleted with pytest.raises(ClientError) as e: lmbda.get_function(FunctionName=created_fn_name) - e.value.response["Error"]["Code"].should.equal("ResourceNotFoundException") + assert e.value.response["Error"]["Code"] == "ResourceNotFoundException" @mock_cloudformation @@ -152,10 +148,10 @@ def test_event_source_mapping_create_from_cloudformation_json(): cf.create_stack(StackName=random_stack_name(), TemplateBody=esm_template) event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) - event_sources["EventSourceMappings"].should.have.length_of(1) + assert len(event_sources["EventSourceMappings"]) == 1 event_source = event_sources["EventSourceMappings"][0] - event_source["EventSourceArn"].should.be.equal(queue.attributes["QueueArn"]) - event_source["FunctionArn"].should.be.equal(created_fn_arn) + assert event_source["EventSourceArn"] == queue.attributes["QueueArn"] + assert event_source["FunctionArn"] == created_fn_arn @mock_cloudformation @@ -189,12 +185,12 @@ def test_event_source_mapping_delete_stack(): ) event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) - event_sources["EventSourceMappings"].should.have.length_of(1) + assert len(event_sources["EventSourceMappings"]) == 1 cf.delete_stack(StackName=esm_stack["StackId"]) event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) - event_sources["EventSourceMappings"].should.have.length_of(0) + assert len(event_sources["EventSourceMappings"]) == 0 @mock_cloudformation @@ -228,8 +224,8 @@ def test_event_source_mapping_update_from_cloudformation_json(): event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) original_esm = event_sources["EventSourceMappings"][0] - original_esm["State"].should.equal("Enabled") - original_esm["BatchSize"].should.equal(1) + assert original_esm["State"] == "Enabled" + assert original_esm["BatchSize"] == 1 # Update new_template = event_source_mapping_template.substitute( @@ -246,8 +242,8 @@ def test_event_source_mapping_update_from_cloudformation_json(): event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) updated_esm = event_sources["EventSourceMappings"][0] - updated_esm["State"].should.equal("Disabled") - updated_esm["BatchSize"].should.equal(10) + assert updated_esm["State"] == "Disabled" + assert updated_esm["BatchSize"] == 10 @mock_cloudformation @@ -281,8 +277,8 @@ def test_event_source_mapping_delete_from_cloudformation_json(): event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) original_esm = event_sources["EventSourceMappings"][0] - original_esm["State"].should.equal("Enabled") - original_esm["BatchSize"].should.equal(1) + assert original_esm["State"] == "Enabled" + assert original_esm["BatchSize"] == 1 # Update with deletion of old resources new_template = event_source_mapping_template.substitute( @@ -298,12 +294,12 @@ def test_event_source_mapping_delete_from_cloudformation_json(): cf.update_stack(StackName=stack_name, TemplateBody=new_template) event_sources = lmbda.list_event_source_mappings(FunctionName=created_fn_name) - event_sources["EventSourceMappings"].should.have.length_of(1) + assert len(event_sources["EventSourceMappings"]) == 1 updated_esm = event_sources["EventSourceMappings"][0] - updated_esm["State"].should.equal("Disabled") - updated_esm["BatchSize"].should.equal(10) - updated_esm["UUID"].shouldnt.equal(original_esm["UUID"]) + assert updated_esm["State"] == "Disabled" + assert updated_esm["BatchSize"] == 10 + assert updated_esm["UUID"] != original_esm["UUID"] def create_stack(cf, s3): diff --git a/tests/test_awslambda/test_lambda.py b/tests/test_awslambda/test_lambda.py index 569c21c3e..7b3fb6782 100644 --- a/tests/test_awslambda/test_lambda.py +++ b/tests/test_awslambda/test_lambda.py @@ -2,10 +2,8 @@ import base64 import json import os from unittest import SkipTest -import botocore.client import boto3 import hashlib -import sure # noqa # pylint: disable=unused-import import pytest from botocore.exceptions import ClientError @@ -32,7 +30,7 @@ boto3.setup_default_session(region_name=_lambda_region) def test_lambda_regions(region): client = boto3.client("lambda", region_name=region) resp = client.list_functions() - resp["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 @mock_lambda @@ -41,7 +39,7 @@ def test_list_functions(): function_name = str(uuid4())[0:6] initial_list = conn.list_functions()["Functions"] initial_names = [f["FunctionName"] for f in initial_list] - initial_names.shouldnt.contain(function_name) + assert function_name not in initial_names conn.create_function( FunctionName=function_name, @@ -51,28 +49,30 @@ def test_list_functions(): Code={"ZipFile": get_test_zip_file1()}, ) names = [f["FunctionName"] for f in conn.list_functions()["Functions"]] - names.should.contain(function_name) + assert function_name in names conn.publish_version(FunctionName=function_name, Description="v2") func_list = conn.list_functions()["Functions"] our_functions = [f for f in func_list if f["FunctionName"] == function_name] - our_functions.should.have.length_of(1) + assert len(our_functions) == 1 # FunctionVersion=ALL means we should get a list of all versions full_list = conn.list_functions(FunctionVersion="ALL")["Functions"] our_functions = [f for f in full_list if f["FunctionName"] == function_name] - our_functions.should.have.length_of(2) + assert len(our_functions) == 2 v1 = [f for f in our_functions if f["Version"] == "1"][0] - v1["Description"].should.equal("v2") - v1["FunctionArn"].should.equal( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:1" + assert v1["Description"] == "v2" + assert ( + v1["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:1" ) latest = [f for f in our_functions if f["Version"] == "$LATEST"][0] - latest["Description"].should.equal("") - latest["FunctionArn"].should.equal( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST" + assert latest["Description"] == "" + assert ( + latest["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST" ) @@ -81,18 +81,27 @@ def test_create_based_on_s3_with_missing_bucket(): conn = boto3.client("lambda", _lambda_region) function_name = str(uuid4())[0:6] - conn.create_function.when.called_with( - FunctionName=function_name, - Runtime="python2.7", - Role=get_role_name(), - Handler="lambda_function.lambda_handler", - Code={"S3Bucket": "this-bucket-does-not-exist", "S3Key": "test.zip"}, - Description="test lambda function", - Timeout=3, - MemorySize=128, - Publish=True, - VpcConfig={"SecurityGroupIds": ["sg-123abc"], "SubnetIds": ["subnet-123abc"]}, - ).should.throw(botocore.client.ClientError) + with pytest.raises(ClientError) as exc: + conn.create_function( + FunctionName=function_name, + Runtime="python2.7", + Role=get_role_name(), + Handler="lambda_function.lambda_handler", + Code={"S3Bucket": "this-bucket-does-not-exist", "S3Key": "test.zip"}, + Description="test lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + VpcConfig={ + "SecurityGroupIds": ["sg-123abc"], + "SubnetIds": ["subnet-123abc"], + }, + ) + err = exc.value.response["Error"] + assert ( + err["Message"] + == "Error occurred while GetObject. S3 Error Code: NoSuchBucket. S3 Error Message: The specified bucket does not exist" + ) @mock_lambda @@ -125,16 +134,17 @@ def test_create_function_from_aws_bucket(): VpcConfig={"SecurityGroupIds": ["sg-123abc"], "SubnetIds": ["subnet-123abc"]}, ) - result.should.have.key("FunctionName").equals(function_name) - result.should.have.key("FunctionArn").equals( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" + assert result["FunctionName"] == function_name + assert ( + result["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" ) - result.should.have.key("Runtime").equals("python2.7") - result.should.have.key("Handler").equals("lambda_function.lambda_handler") - result.should.have.key("CodeSha256").equals( - base64.b64encode(hashlib.sha256(zip_content).digest()).decode("utf-8") - ) - result.should.have.key("State").equals("Active") + assert result["Runtime"] == "python2.7" + assert result["Handler"] == "lambda_function.lambda_handler" + assert result["CodeSha256"] == base64.b64encode( + hashlib.sha256(zip_content).digest() + ).decode("utf-8") + assert result["State"] == "Active" @mock_lambda @@ -160,28 +170,26 @@ def test_create_function_from_zipfile(): result["ResponseMetadata"].pop("RetryAttempts", None) result.pop("LastModified") - result.should.equal( - { - "FunctionName": function_name, - "FunctionArn": f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}", - "Runtime": "python2.7", - "Role": result["Role"], - "Handler": "lambda_function.lambda_handler", - "CodeSize": len(zip_content), - "Description": "test lambda function", - "Timeout": 3, - "MemorySize": 128, - "CodeSha256": base64.b64encode(hashlib.sha256(zip_content).digest()).decode( - "utf-8" - ), - "Version": "1", - "VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []}, - "ResponseMetadata": {"HTTPStatusCode": 201}, - "State": "Active", - "Layers": [], - "TracingConfig": {"Mode": "PassThrough"}, - } - ) + assert result == { + "FunctionName": function_name, + "FunctionArn": f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}", + "Runtime": "python2.7", + "Role": result["Role"], + "Handler": "lambda_function.lambda_handler", + "CodeSize": len(zip_content), + "Description": "test lambda function", + "Timeout": 3, + "MemorySize": 128, + "CodeSha256": base64.b64encode(hashlib.sha256(zip_content).digest()).decode( + "utf-8" + ), + "Version": "1", + "VpcConfig": {"SecurityGroupIds": [], "SubnetIds": []}, + "ResponseMetadata": {"HTTPStatusCode": 201}, + "State": "Active", + "Layers": [], + "TracingConfig": {"Mode": "PassThrough"}, + } @mock_lambda @@ -208,7 +216,7 @@ def test_create_function__with_tracingmode(tracing_mode): if source: kwargs["TracingConfig"] = {"Mode": source} result = conn.create_function(**kwargs) - result.should.have.key("TracingConfig").should.equal({"Mode": output}) + assert result["TracingConfig"] == {"Mode": output} @pytest.fixture(name="with_ecr_mock") @@ -216,7 +224,7 @@ def ecr_repo_fixture(): with mock_ecr(): os.environ["MOTO_LAMBDA_STUB_ECR"] = "FALSE" repo_name = "testlambdaecr" - ecr_client = ecr_client = boto3.client("ecr", "us-east-1") + ecr_client = boto3.client("ecr", "us-east-1") ecr_client.create_repository(repositoryName=repo_name) response = ecr_client.put_image( repositoryName=repo_name, @@ -244,21 +252,21 @@ def test_create_function_from_stubbed_ecr(): resp = lambda_client.create_function(**dic) - resp.should.have.key("FunctionName").equals(fn_name) - resp.should.have.key("CodeSize").equals(0) - resp.should.have.key("CodeSha256") - resp.should.have.key("PackageType").equals("Image") + assert resp["FunctionName"] == fn_name + assert resp["CodeSize"] == 0 + assert "CodeSha256" in resp + assert resp["PackageType"] == "Image" result = lambda_client.get_function(FunctionName=fn_name) - result.should.have.key("Configuration") + assert "Configuration" in result config = result["Configuration"] - result.should.have.key("Code") + assert "Code" in result code = result["Code"] - code.should.have.key("RepositoryType").equals("ECR") - code.should.have.key("ImageUri").equals(image_uri) + assert code["RepositoryType"] == "ECR" + assert code["ImageUri"] == image_uri image_uri_without_tag = image_uri.split(":")[0] resolved_image_uri = f"{image_uri_without_tag}@sha256:{config['CodeSha256']}" - code.should.have.key("ResolvedImageUri").equals(resolved_image_uri) + assert code["ResolvedImageUri"] == resolved_image_uri @mock_lambda @@ -284,25 +292,23 @@ def test_create_function_from_mocked_ecr_image_tag( } resp = lambda_client.create_function(**dic) - resp.should.have.key("FunctionName").equals(fn_name) - resp.should.have.key("CodeSize").greater_than(0) - resp.should.have.key("CodeSha256") - resp.should.have.key("PackageType").equals("Image") + assert resp["FunctionName"] == fn_name + assert resp["CodeSize"] > 0 + assert "CodeSha256" in resp + assert resp["PackageType"] == "Image" result = lambda_client.get_function(FunctionName=fn_name) - result.should.have.key("Configuration") + assert "Configuration" in result config = result["Configuration"] - config.should.have.key("CodeSha256").equals( - image["imageDigest"].replace("sha256:", "") - ) - config.should.have.key("CodeSize").equals(resp["CodeSize"]) - result.should.have.key("Code") + assert config["CodeSha256"] == image["imageDigest"].replace("sha256:", "") + assert config["CodeSize"] == resp["CodeSize"] + assert "Code" in result code = result["Code"] - code.should.have.key("RepositoryType").equals("ECR") - code.should.have.key("ImageUri").equals(image_uri) + assert code["RepositoryType"] == "ECR" + assert code["ImageUri"] == image_uri image_uri_without_tag = image_uri.split(":")[0] resolved_image_uri = f"{image_uri_without_tag}@sha256:{config['CodeSha256']}" - code.should.have.key("ResolvedImageUri").equals(resolved_image_uri) + assert code["ResolvedImageUri"] == resolved_image_uri @mock_lambda @@ -326,12 +332,10 @@ def test_create_function_from_mocked_ecr_image_digest( "Timeout": 100, } resp = lambda_client.create_function(**dic) - resp.should.have.key("FunctionName").equals(fn_name) - resp.should.have.key("CodeSize").greater_than(0) - resp.should.have.key("CodeSha256").equals( - image["imageDigest"].replace("sha256:", "") - ) - resp.should.have.key("PackageType").equals("Image") + assert resp["FunctionName"] == fn_name + assert resp["CodeSize"] > 0 + assert resp["CodeSha256"] == image["imageDigest"].replace("sha256:", "") + assert resp["PackageType"] == "Image" @mock_lambda @@ -360,9 +364,10 @@ def test_create_function_from_mocked_ecr_missing_image( lambda_client.create_function(**dic) err = exc.value.response["Error"] - err["Code"].should.equal("ImageNotFoundException") - err["Message"].should.equal( - "The image with imageId {'imageTag': 'dne'} does not exist within the repository with name 'testlambdaecr' in the registry with id '123456789012'" + assert err["Code"] == "ImageNotFoundException" + assert ( + err["Message"] + == "The image with imageId {'imageTag': 'dne'} does not exist within the repository with name 'testlambdaecr' in the registry with id '123456789012'" ) @@ -402,36 +407,38 @@ def test_get_function(): result["ResponseMetadata"].pop("RetryAttempts", None) result["Configuration"].pop("LastModified") - result["Code"]["Location"].should.equal( - f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com/test.zip" + assert ( + result["Code"]["Location"] + == f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com/test.zip" ) - result["Code"]["RepositoryType"].should.equal("S3") + assert result["Code"]["RepositoryType"] == "S3" - result["Configuration"]["CodeSha256"].should.equal( - base64.b64encode(hashlib.sha256(zip_content).digest()).decode("utf-8") - ) - result["Configuration"]["CodeSize"].should.equal(len(zip_content)) - result["Configuration"]["Description"].should.equal("test lambda function") - result["Configuration"].should.contain("FunctionArn") - result["Configuration"]["FunctionName"].should.equal(function_name) - result["Configuration"]["Handler"].should.equal("lambda_function.lambda_handler") - result["Configuration"]["MemorySize"].should.equal(128) - result["Configuration"]["Role"].should.equal(get_role_name()) - result["Configuration"]["Runtime"].should.equal("python2.7") - result["Configuration"]["Timeout"].should.equal(3) - result["Configuration"]["Version"].should.equal("$LATEST") - result["Configuration"].should.contain("VpcConfig") - result["Configuration"].should.contain("Environment") - result["Configuration"]["Environment"].should.contain("Variables") - result["Configuration"]["Environment"]["Variables"].should.equal( - {"test_variable": "test_value"} - ) + assert result["Configuration"]["CodeSha256"] == base64.b64encode( + hashlib.sha256(zip_content).digest() + ).decode("utf-8") + assert result["Configuration"]["CodeSize"] == len(zip_content) + assert result["Configuration"]["Description"] == "test lambda function" + assert "FunctionArn" in result["Configuration"] + assert result["Configuration"]["FunctionName"] == function_name + assert result["Configuration"]["Handler"] == "lambda_function.lambda_handler" + assert result["Configuration"]["MemorySize"] == 128 + assert result["Configuration"]["Role"] == get_role_name() + assert result["Configuration"]["Runtime"] == "python2.7" + assert result["Configuration"]["Timeout"] == 3 + assert result["Configuration"]["Version"] == "$LATEST" + assert "VpcConfig" in result["Configuration"] + assert "Environment" in result["Configuration"] + assert "Variables" in result["Configuration"]["Environment"] + assert result["Configuration"]["Environment"]["Variables"] == { + "test_variable": "test_value" + } # Test get function with qualifier result = conn.get_function(FunctionName=function_name, Qualifier="$LATEST") - result["Configuration"]["Version"].should.equal("$LATEST") - result["Configuration"]["FunctionArn"].should.equal( - f"arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:$LATEST" + assert result["Configuration"]["Version"] == "$LATEST" + assert ( + result["Configuration"]["FunctionArn"] + == f"arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:$LATEST" ) # Test get function when can't find function name @@ -472,31 +479,32 @@ def test_get_function_configuration(key): result = conn.get_function_configuration(FunctionName=name_or_arn) - result["CodeSha256"].should.equal( - base64.b64encode(hashlib.sha256(zip_content).digest()).decode("utf-8") - ) - result["CodeSize"].should.equal(len(zip_content)) - result["Description"].should.equal("test lambda function") - result.should.contain("FunctionArn") - result["FunctionName"].should.equal(function_name) - result["Handler"].should.equal("lambda_function.lambda_handler") - result["MemorySize"].should.equal(128) - result["Role"].should.equal(get_role_name()) - result["Runtime"].should.equal("python2.7") - result["Timeout"].should.equal(3) - result["Version"].should.equal("$LATEST") - result.should.contain("VpcConfig") - result.should.contain("Environment") - result["Environment"].should.contain("Variables") - result["Environment"]["Variables"].should.equal({"test_variable": "test_value"}) + assert result["CodeSha256"] == base64.b64encode( + hashlib.sha256(zip_content).digest() + ).decode("utf-8") + assert result["CodeSize"] == len(zip_content) + assert result["Description"] == "test lambda function" + assert "FunctionArn" in result + assert result["FunctionName"] == function_name + assert result["Handler"] == "lambda_function.lambda_handler" + assert result["MemorySize"] == 128 + assert result["Role"] == get_role_name() + assert result["Runtime"] == "python2.7" + assert result["Timeout"] == 3 + assert result["Version"] == "$LATEST" + assert "VpcConfig" in result + assert "Environment" in result + assert "Variables" in result["Environment"] + assert result["Environment"]["Variables"] == {"test_variable": "test_value"} # Test get function with qualifier result = conn.get_function_configuration( FunctionName=name_or_arn, Qualifier="$LATEST" ) - result["Version"].should.equal("$LATEST") - result["FunctionArn"].should.equal( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST" + assert result["Version"] == "$LATEST" + assert ( + result["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST" ) # Test get function when can't find function name @@ -532,8 +540,8 @@ def test_get_function_code_signing_config(key): result = conn.get_function_code_signing_config(FunctionName=name_or_arn) - result["FunctionName"].should.equal(function_name) - result["CodeSigningConfigArn"].should.equal("csc:arn") + assert result["FunctionName"] == function_name + assert result["CodeSigningConfigArn"] == "csc:arn" @mock_lambda @@ -564,7 +572,7 @@ def test_get_function_by_arn(): ) result = conn.get_function(FunctionName=fnc["FunctionArn"]) - result["Configuration"]["FunctionName"].should.equal(function_name) + assert result["Configuration"]["FunctionName"] == function_name @mock_lambda @@ -600,11 +608,11 @@ def test_delete_function(): # Botocore inserts retry attempts not seen in Python27 success_result["ResponseMetadata"].pop("RetryAttempts", None) - success_result.should.equal({"ResponseMetadata": {"HTTPStatusCode": 204}}) + assert success_result == {"ResponseMetadata": {"HTTPStatusCode": 204}} func_list = conn.list_functions()["Functions"] our_functions = [f for f in func_list if f["FunctionName"] == function_name] - our_functions.should.have.length_of(0) + assert len(our_functions) == 0 @mock_lambda @@ -638,15 +646,16 @@ def test_delete_function_by_arn(): func_list = conn.list_functions()["Functions"] our_functions = [f for f in func_list if f["FunctionName"] == function_name] - our_functions.should.have.length_of(0) + assert len(our_functions) == 0 @mock_lambda def test_delete_unknown_function(): conn = boto3.client("lambda", _lambda_region) - conn.delete_function.when.called_with( - FunctionName="testFunctionThatDoesntExist" - ).should.throw(botocore.client.ClientError) + with pytest.raises(ClientError) as exc: + conn.delete_function(FunctionName="testFunctionThatDoesntExist") + err = exc.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" @mock_lambda @@ -662,9 +671,10 @@ def test_publish_version_unknown_function(name): with pytest.raises(ClientError) as exc: client.publish_version(FunctionName=name, Description="v2") err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") - err["Message"].should.equal( - f"Function not found: arn:aws:lambda:eu-west-1:{ACCOUNT_ID}:function:bad_function_name" + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] + == f"Function not found: arn:aws:lambda:eu-west-1:{ACCOUNT_ID}:function:bad_function_name" ) @@ -697,7 +707,7 @@ def test_publish(): function_list = conn.list_functions(FunctionVersion="ALL")["Functions"] our_functions = [f for f in function_list if f["FunctionName"] == function_name] - our_functions.should.have.length_of(1) + assert len(our_functions) == 1 latest_arn = our_functions[0]["FunctionArn"] res = conn.publish_version(FunctionName=function_name) @@ -705,18 +715,18 @@ def test_publish(): function_list = conn.list_functions(FunctionVersion="ALL")["Functions"] our_functions = [f for f in function_list if f["FunctionName"] == function_name] - our_functions.should.have.length_of(2) + assert len(our_functions) == 2 # #SetComprehension ;-) published_arn = list({f["FunctionArn"] for f in our_functions} - {latest_arn})[0] - published_arn.should.contain(f"{function_name}:1") + assert f"{function_name}:1" in published_arn conn.delete_function(FunctionName=function_name, Qualifier="1") function_list = conn.list_functions()["Functions"] our_functions = [f for f in function_list if f["FunctionName"] == function_name] - our_functions.should.have.length_of(1) - our_functions[0]["FunctionArn"].should.contain(function_name) + assert len(our_functions) == 1 + assert function_name in our_functions[0]["FunctionArn"] @mock_lambda @@ -741,7 +751,7 @@ def test_list_create_list_get_delete_list(): initial_list = conn.list_functions()["Functions"] initial_names = [f["FunctionName"] for f in initial_list] - initial_names.shouldnt.contain(function_name) + assert function_name not in initial_names function_name = function_name conn.create_function( @@ -783,36 +793,40 @@ def test_list_create_list_get_delete_list(): } functions = conn.list_functions()["Functions"] func_names = [f["FunctionName"] for f in functions] - func_names.should.contain(function_name) + assert function_name in func_names func_arn = [ f["FunctionArn"] for f in functions if f["FunctionName"] == function_name ][0] - func_arn.should.equal( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" + assert ( + func_arn + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" ) functions = conn.list_functions(FunctionVersion="ALL")["Functions"] our_functions = [f for f in functions if f["FunctionName"] == function_name] - our_functions.should.have.length_of(2) + assert len(our_functions) == 2 latest = [f for f in our_functions if f["Version"] == "$LATEST"][0] - latest["FunctionArn"].should.equal( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST" + assert ( + latest["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:$LATEST" ) latest.pop("FunctionArn") latest.pop("LastModified") - latest.should.equal(expected_function_result["Configuration"]) + assert latest == expected_function_result["Configuration"] published = [f for f in our_functions if f["Version"] != "$LATEST"][0] - published["Version"].should.equal("1") - published["FunctionArn"].should.equal( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:1" + assert published["Version"] == "1" + assert ( + published["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:1" ) func = conn.get_function(FunctionName=function_name) - func["Configuration"]["FunctionArn"].should.equal( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" + assert ( + func["Configuration"]["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" ) # this is hard to match against, so remove it @@ -822,12 +836,12 @@ def test_list_create_list_get_delete_list(): func["Configuration"].pop("LastModified") func["Configuration"].pop("FunctionArn") - func.should.equal(expected_function_result) + assert func == expected_function_result conn.delete_function(FunctionName=function_name) functions = conn.list_functions()["Functions"] func_names = [f["FunctionName"] for f in functions] - func_names.shouldnt.contain(function_name) + assert function_name not in func_names @mock_lambda @@ -850,32 +864,33 @@ def test_get_function_created_with_zipfile(): response = conn.get_function(FunctionName=function_name) - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert len(response["Code"]) == 2 assert response["Code"]["RepositoryType"] == "S3" assert response["Code"]["Location"].startswith( f"s3://awslambda-{_lambda_region}-tasks.s3-{_lambda_region}.amazonaws.com" ) - response.should.have.key("Configuration") + assert "Configuration" in response config = response["Configuration"] - config.should.have.key("CodeSha256").equals( - base64.b64encode(hashlib.sha256(zip_content).digest()).decode("utf-8") + assert config["CodeSha256"] == base64.b64encode( + hashlib.sha256(zip_content).digest() + ).decode("utf-8") + assert config["CodeSize"] == len(zip_content) + assert config["Description"] == "test lambda function" + assert ( + config["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" ) - config.should.have.key("CodeSize").equals(len(zip_content)) - config.should.have.key("Description").equals("test lambda function") - config.should.have.key("FunctionArn").equals( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}" - ) - config.should.have.key("FunctionName").equals(function_name) - config.should.have.key("Handler").equals("lambda_function.handler") - config.should.have.key("MemorySize").equals(128) - config.should.have.key("Role").equals(get_role_name()) - config.should.have.key("Runtime").equals("python2.7") - config.should.have.key("Timeout").equals(3) - config.should.have.key("Version").equals("$LATEST") - config.should.have.key("State").equals("Active") - config.should.have.key("Layers").equals([]) - config.should.have.key("LastUpdateStatus").equals("Successful") + assert config["FunctionName"] == function_name + assert config["Handler"] == "lambda_function.handler" + assert config["MemorySize"] == 128 + assert config["Role"] == get_role_name() + assert config["Runtime"] == "python2.7" + assert config["Timeout"] == 3 + assert config["Version"] == "$LATEST" + assert config["State"] == "Active" + assert config["Layers"] == [] + assert config["LastUpdateStatus"] == "Successful" @mock_lambda @@ -1083,11 +1098,11 @@ def test_update_function_zip(key): update1 = conn.update_function_code( FunctionName=name_or_arn, ZipFile=zip_content_two, Publish=True ) - update1["CodeSha256"].shouldnt.equal(first_sha) + assert update1["CodeSha256"] != first_sha response = conn.get_function(FunctionName=function_name, Qualifier="2") - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert len(response["Code"]) == 2 assert response["Code"]["RepositoryType"] == "S3" assert response["Code"]["Location"].startswith( @@ -1095,38 +1110,39 @@ def test_update_function_zip(key): ) config = response["Configuration"] - config.should.have.key("CodeSize").equals(len(zip_content_two)) - config.should.have.key("Description").equals("test lambda function") - config.should.have.key("FunctionArn").equals( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:2" + assert config["CodeSize"] == len(zip_content_two) + assert config["Description"] == "test lambda function" + assert ( + config["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:2" ) - config.should.have.key("FunctionName").equals(function_name) - config.should.have.key("Version").equals("2") - config.should.have.key("LastUpdateStatus").equals("Successful") - config.should.have.key("CodeSha256").equals(update1["CodeSha256"]) + assert config["FunctionName"] == function_name + assert config["Version"] == "2" + assert config["LastUpdateStatus"] == "Successful" + assert config["CodeSha256"] == update1["CodeSha256"] most_recent_config = conn.get_function(FunctionName=function_name) - most_recent_config["Configuration"]["CodeSha256"].should.equal( - update1["CodeSha256"] - ) + assert most_recent_config["Configuration"]["CodeSha256"] == update1["CodeSha256"] # Publishing this again, with the same code, gives us the same version same_update = conn.update_function_code( FunctionName=name_or_arn, ZipFile=zip_content_two, Publish=True ) - same_update["FunctionArn"].should.equal( - most_recent_config["Configuration"]["FunctionArn"] + ":2" + assert ( + same_update["FunctionArn"] + == most_recent_config["Configuration"]["FunctionArn"] + ":2" ) - same_update["Version"].should.equal("2") + assert same_update["Version"] == "2" # Only when updating the code should we have a new version new_update = conn.update_function_code( FunctionName=name_or_arn, ZipFile=get_test_zip_file3(), Publish=True ) - new_update["FunctionArn"].should.equal( - most_recent_config["Configuration"]["FunctionArn"] + ":3" + assert ( + new_update["FunctionArn"] + == most_recent_config["Configuration"]["FunctionArn"] + ":3" ) - new_update["Version"].should.equal("3") + assert new_update["Version"] == "3" @mock_lambda @@ -1169,7 +1185,7 @@ def test_update_function_s3(): response = conn.get_function(FunctionName=function_name, Qualifier="2") - response["ResponseMetadata"]["HTTPStatusCode"].should.equal(200) + assert response["ResponseMetadata"]["HTTPStatusCode"] == 200 assert len(response["Code"]) == 2 assert response["Code"]["RepositoryType"] == "S3" assert response["Code"]["Location"].startswith( @@ -1177,32 +1193,35 @@ def test_update_function_s3(): ) config = response["Configuration"] - config.should.have.key("CodeSha256").equals( - base64.b64encode(hashlib.sha256(zip_content_two).digest()).decode("utf-8") + assert config["CodeSha256"] == base64.b64encode( + hashlib.sha256(zip_content_two).digest() + ).decode("utf-8") + assert config["CodeSize"] == len(zip_content_two) + assert config["Description"] == "test lambda function" + assert ( + config["FunctionArn"] + == f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:2" ) - config.should.have.key("CodeSize").equals(len(zip_content_two)) - config.should.have.key("Description").equals("test lambda function") - config.should.have.key("FunctionArn").equals( - f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:function:{function_name}:2" - ) - config.should.have.key("FunctionName").equals(function_name) - config.should.have.key("Version").equals("2") - config.should.have.key("LastUpdateStatus").equals("Successful") + assert config["FunctionName"] == function_name + assert config["Version"] == "2" + assert config["LastUpdateStatus"] == "Successful" @mock_lambda def test_create_function_with_invalid_arn(): err = create_invalid_lambda("test-iam-role") - err.value.response["Error"]["Message"].should.equal( - r"1 validation error detected: Value 'test-iam-role' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::(\d{12}):role/?[a-zA-Z_0-9+=,.@\-_/]+" + assert ( + err.value.response["Error"]["Message"] + == r"1 validation error detected: Value 'test-iam-role' at 'role' failed to satisfy constraint: Member must satisfy regular expression pattern: arn:(aws[a-zA-Z-]*)?:iam::(\d{12}):role/?[a-zA-Z_0-9+=,.@\-_/]+" ) @mock_lambda def test_create_function_with_arn_from_different_account(): err = create_invalid_lambda("arn:aws:iam::000000000000:role/example_role") - err.value.response["Error"]["Message"].should.equal( - "Cross-account pass role is not allowed." + assert ( + err.value.response["Error"]["Message"] + == "Cross-account pass role is not allowed." ) @@ -1211,8 +1230,9 @@ def test_create_function_with_unknown_arn(): err = create_invalid_lambda( "arn:aws:iam::" + str(ACCOUNT_ID) + ":role/service-role/unknown_role" ) - err.value.response["Error"]["Message"].should.equal( - "The role defined for the function cannot be assumed by Lambda." + assert ( + err.value.response["Error"]["Message"] + == "The role defined for the function cannot be assumed by Lambda." ) @@ -1233,8 +1253,8 @@ def test_remove_unknown_permission_throws_error(): with pytest.raises(ClientError) as exc: conn.remove_permission(FunctionName=arn, StatementId="1") err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") - err["Message"].should.equal("No policy is associated with the given resource.") + assert err["Code"] == "ResourceNotFoundException" + assert err["Message"] == "No policy is associated with the given resource." @mock_lambda @@ -1258,18 +1278,19 @@ def test_multiple_qualifiers(): resp = client.list_versions_by_function(FunctionName=fn_name)["Versions"] qualis = [fn["FunctionArn"].split(":")[-1] for fn in resp] - qualis.should.equal(["$LATEST", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]) + assert qualis == ["$LATEST", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10"] client.delete_function(FunctionName=fn_name, Qualifier="4") client.delete_function(FunctionName=fn_name, Qualifier="5") resp = client.list_versions_by_function(FunctionName=fn_name)["Versions"] qualis = [fn["FunctionArn"].split(":")[-1] for fn in resp] - qualis.should.equal(["$LATEST", "1", "2", "3", "6", "7", "8", "9", "10"]) + assert qualis == ["$LATEST", "1", "2", "3", "6", "7", "8", "9", "10"] fn = client.get_function(FunctionName=fn_name, Qualifier="6")["Configuration"] - fn["FunctionArn"].should.equal( - f"arn:aws:lambda:us-east-1:{ACCOUNT_ID}:function:{fn_name}:6" + assert ( + fn["FunctionArn"] + == f"arn:aws:lambda:us-east-1:{ACCOUNT_ID}:function:{fn_name}:6" ) diff --git a/tests/test_awslambda/test_lambda_alias.py b/tests/test_awslambda/test_lambda_alias.py index 866dd5c7a..a0fd33150 100644 --- a/tests/test_awslambda/test_lambda_alias.py +++ b/tests/test_awslambda/test_lambda_alias.py @@ -33,14 +33,15 @@ def test_create_alias(): FunctionName=function_name, Name="alias1", FunctionVersion="$LATEST" ) - resp.should.have.key("AliasArn").equals( - f"arn:aws:lambda:ap-southeast-1:{ACCOUNT_ID}:function:{function_name}:alias1" + assert ( + resp["AliasArn"] + == f"arn:aws:lambda:ap-southeast-1:{ACCOUNT_ID}:function:{function_name}:alias1" ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("FunctionVersion").equals("$LATEST") - resp.should.have.key("Description").equals("") - resp.should.have.key("RevisionId") - resp.shouldnt.have.key("RoutingConfig") + assert resp["Name"] == "alias1" + assert resp["FunctionVersion"] == "$LATEST" + assert resp["Description"] == "" + assert "RevisionId" in resp + assert "RoutingConfig" not in resp @mock_lambda @@ -64,11 +65,9 @@ def test_create_alias_with_routing_config(): RoutingConfig={"AdditionalVersionWeights": {"2": 0.5}}, ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("Description").equals("desc") - resp.should.have.key("RoutingConfig").equals( - {"AdditionalVersionWeights": {"2": 0.5}} - ) + assert resp["Name"] == "alias1" + assert resp["Description"] == "desc" + assert resp["RoutingConfig"] == {"AdditionalVersionWeights": {"2": 0.5}} @mock_lambda @@ -89,11 +88,12 @@ def test_create_alias_using_function_arn(): FunctionName=fn_arn, Name="alias1", FunctionVersion="$LATEST" ) - resp.should.have.key("AliasArn").equals( - f"arn:aws:lambda:ap-southeast-1:{ACCOUNT_ID}:function:{function_name}:alias1" + assert ( + resp["AliasArn"] + == f"arn:aws:lambda:ap-southeast-1:{ACCOUNT_ID}:function:{function_name}:alias1" ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("FunctionVersion").equals("$LATEST") + assert resp["Name"] == "alias1" + assert resp["FunctionVersion"] == "$LATEST" @mock_lambda @@ -118,7 +118,7 @@ def test_delete_alias(): with pytest.raises(ClientError) as exc: client.get_alias(FunctionName=function_name, Name="alias1") err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") + assert err["Code"] == "ResourceNotFoundException" @mock_lambda @@ -140,13 +140,14 @@ def test_get_alias(): resp = client.get_alias(FunctionName=function_name, Name="alias1") - resp.should.have.key("AliasArn").equals( - f"arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:alias1" + assert ( + resp["AliasArn"] + == f"arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:alias1" ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("FunctionVersion").equals("$LATEST") - resp.should.have.key("Description").equals("") - resp.should.have.key("RevisionId") + assert resp["Name"] == "alias1" + assert resp["FunctionVersion"] == "$LATEST" + assert resp["Description"] == "" + assert "RevisionId" in resp @mock_lambda @@ -169,13 +170,14 @@ def test_get_alias_using_function_arn(): resp = client.get_alias(FunctionName=fn_arn, Name="alias1") - resp.should.have.key("AliasArn").equals( - f"arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:alias1" + assert ( + resp["AliasArn"] + == f"arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:alias1" ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("FunctionVersion").equals("$LATEST") - resp.should.have.key("Description").equals("") - resp.should.have.key("RevisionId") + assert resp["Name"] == "alias1" + assert resp["FunctionVersion"] == "$LATEST" + assert resp["Description"] == "" + assert "RevisionId" in resp @mock_lambda @@ -198,13 +200,14 @@ def test_get_alias_using_alias_arn(): resp = client.get_alias(FunctionName=alias_arn, Name="alias1") - resp.should.have.key("AliasArn").equals( - f"arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:alias1" + assert ( + resp["AliasArn"] + == f"arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:alias1" ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("FunctionVersion").equals("$LATEST") - resp.should.have.key("Description").equals("") - resp.should.have.key("RevisionId") + assert resp["Name"] == "alias1" + assert resp["FunctionVersion"] == "$LATEST" + assert resp["Description"] == "" + assert "RevisionId" in resp @mock_lambda @@ -223,9 +226,10 @@ def test_get_unknown_alias(): with pytest.raises(ClientError) as exc: client.get_alias(FunctionName=function_name, Name="unknown") err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") - err["Message"].should.equal( - f"Cannot find alias arn: arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:unknown" + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] + == f"Cannot find alias arn: arn:aws:lambda:us-west-1:{ACCOUNT_ID}:function:{function_name}:unknown" ) @@ -253,13 +257,14 @@ def test_update_alias(): Description="updated desc", ) - resp.should.have.key("AliasArn").equals( - f"arn:aws:lambda:us-east-2:{ACCOUNT_ID}:function:{function_name}:alias1" + assert ( + resp["AliasArn"] + == f"arn:aws:lambda:us-east-2:{ACCOUNT_ID}:function:{function_name}:alias1" ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("FunctionVersion").equals("1") - resp.should.have.key("Description").equals("updated desc") - resp.should.have.key("RevisionId") + assert resp["Name"] == "alias1" + assert resp["FunctionVersion"] == "1" + assert resp["Description"] == "updated desc" + assert "RevisionId" in resp @mock_lambda @@ -288,15 +293,14 @@ def test_update_alias_routingconfig(): RoutingConfig={"AdditionalVersionWeights": {"2": 0.5}}, ) - resp.should.have.key("AliasArn").equals( - f"arn:aws:lambda:us-east-2:{ACCOUNT_ID}:function:{function_name}:alias1" - ) - resp.should.have.key("Name").equals("alias1") - resp.should.have.key("FunctionVersion").equals("$LATEST") - resp.should.have.key("Description").equals("desc") - resp.should.have.key("RoutingConfig").equals( - {"AdditionalVersionWeights": {"2": 0.5}} + assert ( + resp["AliasArn"] + == f"arn:aws:lambda:us-east-2:{ACCOUNT_ID}:function:{function_name}:alias1" ) + assert resp["Name"] == "alias1" + assert resp["FunctionVersion"] == "$LATEST" + assert resp["Description"] == "desc" + assert resp["RoutingConfig"] == {"AdditionalVersionWeights": {"2": 0.5}} @mock_lambda @@ -317,6 +321,7 @@ def test_get_function_using_alias(): client.create_alias(FunctionName=fn_name, Name="live", FunctionVersion="1") fn = client.get_function(FunctionName=fn_name, Qualifier="live")["Configuration"] - fn["FunctionArn"].should.equal( - f"arn:aws:lambda:us-east-2:{ACCOUNT_ID}:function:{fn_name}:1" + assert ( + fn["FunctionArn"] + == f"arn:aws:lambda:us-east-2:{ACCOUNT_ID}:function:{fn_name}:1" ) diff --git a/tests/test_awslambda/test_lambda_concurrency.py b/tests/test_awslambda/test_lambda_concurrency.py index 0f2dbb3ae..ebcf67b2b 100644 --- a/tests/test_awslambda/test_lambda_concurrency.py +++ b/tests/test_awslambda/test_lambda_concurrency.py @@ -1,6 +1,5 @@ import boto3 import pytest -import sure # noqa # pylint: disable=unused-import from moto import mock_lambda from uuid import uuid4 @@ -33,7 +32,7 @@ def test_put_function_concurrency(key): FunctionName=name_or_arn, ReservedConcurrentExecutions=expected_concurrency ) - result["ReservedConcurrentExecutions"].should.equal(expected_concurrency) + assert result["ReservedConcurrentExecutions"] == expected_concurrency @pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"]) @@ -61,7 +60,7 @@ def test_delete_function_concurrency(key): conn.delete_function_concurrency(FunctionName=name_or_arn) result = conn.get_function(FunctionName=function_name) - result.doesnt.have.key("Concurrency") + assert "Concurrency" not in result @pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"]) @@ -89,4 +88,4 @@ def test_get_function_concurrency(key): result = conn.get_function_concurrency(FunctionName=name_or_arn) - result["ReservedConcurrentExecutions"].should.equal(expected_concurrency) + assert result["ReservedConcurrentExecutions"] == expected_concurrency diff --git a/tests/test_awslambda/test_lambda_eventsourcemapping.py b/tests/test_awslambda/test_lambda_eventsourcemapping.py index 367591626..a7ffafaad 100644 --- a/tests/test_awslambda/test_lambda_eventsourcemapping.py +++ b/tests/test_awslambda/test_lambda_eventsourcemapping.py @@ -1,11 +1,10 @@ -import botocore.client import boto3 import json import pytest import time -import sure # noqa # pylint: disable=unused-import import uuid +from botocore.exceptions import ClientError from moto import mock_dynamodb, mock_lambda, mock_logs, mock_sns, mock_sqs from uuid import uuid4 from .utilities import ( @@ -358,7 +357,7 @@ def test_list_event_source_mappings(): EventSourceArn=queue.attributes["QueueArn"], FunctionName=func["FunctionArn"] ) mappings = conn.list_event_source_mappings(EventSourceArn="123") - mappings["EventSourceMappings"].should.have.length_of(0) + assert len(mappings["EventSourceMappings"]) == 0 mappings = conn.list_event_source_mappings( EventSourceArn=queue.attributes["QueueArn"] @@ -394,9 +393,8 @@ def test_get_event_source_mapping(): assert mapping["UUID"] == response["UUID"] assert mapping["FunctionArn"] == func["FunctionArn"] - conn.get_event_source_mapping.when.called_with(UUID="1").should.throw( - botocore.client.ClientError - ) + with pytest.raises(ClientError): + conn.get_event_source_mapping(UUID="1") @mock_lambda @@ -474,6 +472,5 @@ def test_delete_event_source_mapping(): response = conn.delete_event_source_mapping(UUID=response["UUID"]) assert response["State"] == "Deleting" - conn.get_event_source_mapping.when.called_with(UUID=response["UUID"]).should.throw( - botocore.client.ClientError - ) + with pytest.raises(ClientError): + conn.get_event_source_mapping(UUID=response["UUID"]) diff --git a/tests/test_awslambda/test_lambda_function_urls.py b/tests/test_awslambda/test_lambda_function_urls.py index 3353b3018..a471ff0bc 100644 --- a/tests/test_awslambda/test_lambda_function_urls.py +++ b/tests/test_awslambda/test_lambda_function_urls.py @@ -23,14 +23,14 @@ def test_create_function_url_config(key): resp = client.create_function_url_config( AuthType="AWS_IAM", FunctionName=name_or_arn ) - resp.should.have.key("FunctionArn").equals(fxn["FunctionArn"]) - resp.should.have.key("AuthType").equals("AWS_IAM") - resp.should.have.key("FunctionUrl") + assert resp["FunctionArn"] == fxn["FunctionArn"] + assert resp["AuthType"] == "AWS_IAM" + assert "FunctionUrl" in resp resp = client.get_function_url_config(FunctionName=name_or_arn) - resp.should.have.key("FunctionArn").equals(fxn["FunctionArn"]) - resp.should.have.key("AuthType").equals("AWS_IAM") - resp.should.have.key("FunctionUrl") + assert resp["FunctionArn"] == fxn["FunctionArn"] + assert resp["AuthType"] == "AWS_IAM" + assert "FunctionUrl" in resp @mock_lambda @@ -58,16 +58,14 @@ def test_create_function_url_config_with_cors(): "MaxAge": 86400, }, ) - resp.should.have.key("Cors").equals( - { - "AllowCredentials": True, - "AllowHeaders": ["date", "keep-alive"], - "AllowMethods": ["*"], - "AllowOrigins": ["*"], - "ExposeHeaders": ["date", "keep-alive"], - "MaxAge": 86400, - } - ) + assert resp["Cors"] == { + "AllowCredentials": True, + "AllowHeaders": ["date", "keep-alive"], + "AllowMethods": ["*"], + "AllowOrigins": ["*"], + "ExposeHeaders": ["date", "keep-alive"], + "MaxAge": 86400, + } @mock_lambda @@ -99,7 +97,7 @@ def test_update_function_url_config_with_cors(): resp = client.update_function_url_config( FunctionName=name_or_arn, AuthType="NONE", Cors={"AllowCredentials": False} ) - resp.should.have.key("Cors").equals({"AllowCredentials": False}) + assert resp["Cors"] == {"AllowCredentials": False} @mock_lambda diff --git a/tests/test_awslambda/test_lambda_invoke.py b/tests/test_awslambda/test_lambda_invoke.py index c228c2668..684045c43 100644 --- a/tests/test_awslambda/test_lambda_invoke.py +++ b/tests/test_awslambda/test_lambda_invoke.py @@ -1,12 +1,11 @@ import base64 -import botocore.client import boto3 import io import json import pytest -import sure # noqa # pylint: disable=unused-import import zipfile +from botocore.exceptions import ClientError from moto import mock_lambda, mock_ec2, settings from uuid import uuid4 from unittest import SkipTest @@ -43,19 +42,19 @@ def test_invoke_function_that_throws_error(): FunctionName=function_name, Payload=json.dumps({}), LogType="Tail" ) - failure_response.should.have.key("FunctionError").being.equal("Handled") + assert failure_response["FunctionError"] == "Handled" payload = failure_response["Payload"].read().decode("utf-8") payload = json.loads(payload) - payload["errorType"].should.equal("Exception") - payload["errorMessage"].should.equal("I failed!") - payload.should.have.key("stackTrace") + assert payload["errorType"] == "Exception" + assert payload["errorMessage"] == "I failed!" + assert "stackTrace" in payload logs = base64.b64decode(failure_response["LogResult"]).decode("utf-8") - logs.should.contain("START RequestId:") - logs.should.contain("I failed!") - logs.should.contain("Traceback (most recent call last):") - logs.should.contain("END RequestId:") + assert "START RequestId:" in logs + assert "I failed!" in logs + assert "Traceback (most recent call last):" in logs + assert "END RequestId:" in logs @pytest.mark.network @@ -93,27 +92,29 @@ def test_invoke_requestresponse_function(invocation_type, key): if "FunctionError" in success_result: assert False, success_result["Payload"].read().decode("utf-8") - success_result["StatusCode"].should.equal(200) - success_result["ResponseMetadata"]["HTTPHeaders"]["content-type"].should.equal( - "application/json" + assert success_result["StatusCode"] == 200 + assert ( + success_result["ResponseMetadata"]["HTTPHeaders"]["content-type"] + == "application/json" ) logs = base64.b64decode(success_result["LogResult"]).decode("utf-8") - logs.should.contain("START RequestId:") - logs.should.contain("custom log event") - logs.should.contain("END RequestId:") + assert "START RequestId:" in logs + assert "custom log event" in logs + assert "END RequestId:" in logs payload = success_result["Payload"].read().decode("utf-8") - json.loads(payload).should.equal(in_data) + assert json.loads(payload) == in_data # Logs should not be returned by default, only when the LogType-param is supplied success_result = conn.invoke( FunctionName=name_or_arn, Payload=json.dumps(in_data), **kw ) - success_result["StatusCode"].should.equal(200) - success_result["ResponseMetadata"]["HTTPHeaders"]["content-type"].should.equal( - "application/json" + assert success_result["StatusCode"] == 200 + assert ( + success_result["ResponseMetadata"]["HTTPHeaders"]["content-type"] + == "application/json" ) assert "LogResult" not in success_result @@ -136,16 +137,15 @@ def test_invoke_event_function(): Publish=True, ) - conn.invoke.when.called_with( - FunctionName="notAFunction", InvocationType="Event", Payload="{}" - ).should.throw(botocore.client.ClientError) + with pytest.raises(ClientError): + conn.invoke(FunctionName="notAFunction", InvocationType="Event", Payload="{}") in_data = {"msg": "So long and thanks for all the fish"} success_result = conn.invoke( FunctionName=function_name, InvocationType="Event", Payload=json.dumps(in_data) ) - success_result["StatusCode"].should.equal(202) - json.loads(success_result["Payload"].read().decode("utf-8")).should.equal(in_data) + assert success_result["StatusCode"] == 202 + assert json.loads(success_result["Payload"].read().decode("utf-8")) == in_data @pytest.mark.network @@ -167,19 +167,19 @@ def test_invoke_lambda_using_environment_port(): FunctionName=function_name, InvocationType="Event", Payload="{}" ) - success_result["StatusCode"].should.equal(202) + assert success_result["StatusCode"] == 202 response = success_result["Payload"].read() response = json.loads(response.decode("utf-8")) functions = response["functions"] function_names = [f["FunctionName"] for f in functions] - function_names.should.contain(function_name) + assert function_name in function_names # Host matches the full URL, so one of: # http://host.docker.internal:5000 # http://172.0.2.1:5000 # http://172.0.1.1:4555 - response["host"].should.match("http://.+:[0-9]{4}") + assert "http://" in response["host"] @pytest.mark.network @@ -209,7 +209,7 @@ def test_invoke_lambda_using_networkmode(): response = success_result["Payload"].read() functions = json.loads(response.decode("utf-8"))["response"] function_names = [f["FunctionName"] for f in functions] - function_names.should.contain(function_name) + assert function_name in function_names @pytest.mark.network @@ -234,9 +234,9 @@ def test_invoke_function_with_multiple_files_in_zip(): success_result = conn.invoke( FunctionName=function_name, InvocationType="Event", Payload=json.dumps(in_data) ) - json.loads(success_result["Payload"].read().decode("utf-8")).should.equal( - {"msg": "So long and thanks for: stuff"} - ) + assert json.loads(success_result["Payload"].read().decode("utf-8")) == { + "msg": "So long and thanks for: stuff" + } @pytest.mark.network @@ -257,15 +257,14 @@ def test_invoke_dryrun_function(): Publish=True, ) - conn.invoke.when.called_with( - FunctionName="notAFunction", InvocationType="Event", Payload="{}" - ).should.throw(botocore.client.ClientError) + with pytest.raises(ClientError): + conn.invoke(FunctionName="notAFunction", InvocationType="Event", Payload="{}") in_data = {"msg": "So long and thanks for all the fish"} success_result = conn.invoke( FunctionName=function_name, InvocationType="DryRun", Payload=json.dumps(in_data) ) - success_result["StatusCode"].should.equal(204) + assert success_result["StatusCode"] == 204 if settings.TEST_SERVER_MODE: @@ -297,10 +296,10 @@ if settings.TEST_SERVER_MODE: InvocationType="RequestResponse", Payload=json.dumps(in_data), ) - result["StatusCode"].should.equal(200) + assert result["StatusCode"] == 200 actual_payload = json.loads(result["Payload"].read().decode("utf-8")) expected_payload = {"id": vol.id, "state": vol.state, "size": vol.size} - actual_payload.should.equal(expected_payload) + assert actual_payload == expected_payload @pytest.mark.network @@ -362,7 +361,7 @@ def test_invoke_async_function(key): FunctionName=name_or_arn, InvokeArgs=json.dumps({"test": "event"}) ) - success_result["Status"].should.equal(202) + assert success_result["Status"] == 202 @pytest.mark.network @@ -384,16 +383,14 @@ def test_invoke_function_large_response(): ) resp = conn.invoke(FunctionName=fxn["FunctionArn"]) - resp.should.have.key("FunctionError").equals("Unhandled") + assert resp["FunctionError"] == "Unhandled" payload = resp["Payload"].read().decode("utf-8") payload = json.loads(payload) - payload.should.equal( - { - "errorMessage": "Response payload size exceeded maximum allowed payload size (6291556 bytes).", - "errorType": "Function.ResponseSizeTooLarge", - } - ) + assert payload == { + "errorMessage": "Response payload size exceeded maximum allowed payload size (6291556 bytes).", + "errorType": "Function.ResponseSizeTooLarge", + } # Absolutely fine when invoking async resp = conn.invoke(FunctionName=fxn["FunctionArn"], InvocationType="Event") - resp.shouldnt.have.key("FunctionError") + assert "FunctionError" not in resp diff --git a/tests/test_awslambda/test_lambda_layers.py b/tests/test_awslambda/test_lambda_layers.py index e015ee8ff..f7dfe7f9b 100644 --- a/tests/test_awslambda/test_lambda_layers.py +++ b/tests/test_awslambda/test_lambda_layers.py @@ -1,6 +1,5 @@ import boto3 import pytest -import sure # noqa # pylint: disable=unused-import from botocore.exceptions import ClientError from freezegun import freeze_time @@ -27,8 +26,8 @@ def test_publish_lambda_layers__without_content(): LicenseInfo="MIT", ) err = exc.value.response["Error"] - err["Code"].should.equal("InvalidParameterValueException") - err["Message"].should.equal("Missing Content") + assert err["Code"] == "InvalidParameterValueException" + assert err["Message"] == "Missing Content" @mock_lambda @@ -66,26 +65,24 @@ def test_get_lambda_layers(): version.pop("CreatedDate") result["LayerVersions"].sort(key=lambda x: x["Version"]) expected_arn = f"arn:aws:lambda:{_lambda_region}:{ACCOUNT_ID}:layer:{layer_name}:" - result["LayerVersions"].should.equal( - [ - { - "Version": 1, - "LayerVersionArn": expected_arn + "1", - "CompatibleRuntimes": ["python3.6"], - "Description": "", - "LicenseInfo": "MIT", - "CompatibleArchitectures": [], - }, - { - "Version": 2, - "LayerVersionArn": expected_arn + "2", - "CompatibleRuntimes": ["python3.6"], - "Description": "", - "LicenseInfo": "MIT", - "CompatibleArchitectures": [], - }, - ] - ) + assert result["LayerVersions"] == [ + { + "Version": 1, + "LayerVersionArn": expected_arn + "1", + "CompatibleRuntimes": ["python3.6"], + "Description": "", + "LicenseInfo": "MIT", + "CompatibleArchitectures": [], + }, + { + "Version": 2, + "LayerVersionArn": expected_arn + "2", + "CompatibleRuntimes": ["python3.6"], + "Description": "", + "LicenseInfo": "MIT", + "CompatibleArchitectures": [], + }, + ] function_name = str(uuid4())[0:6] conn.create_function( @@ -103,19 +100,19 @@ def test_get_lambda_layers(): ) result = conn.get_function_configuration(FunctionName=function_name) - result["Layers"].should.equal( - [{"Arn": (expected_arn + "1"), "CodeSize": len(zip_content)}] - ) + assert result["Layers"] == [ + {"Arn": (expected_arn + "1"), "CodeSize": len(zip_content)} + ] result = conn.update_function_configuration( FunctionName=function_name, Layers=[(expected_arn + "2")] ) - result["Layers"].should.equal( - [{"Arn": (expected_arn + "2"), "CodeSize": len(zip_content)}] - ) + assert result["Layers"] == [ + {"Arn": (expected_arn + "2"), "CodeSize": len(zip_content)} + ] # Test get layer versions for non existant layer result = conn.list_layer_versions(LayerName=f"{layer_name}2") - result["LayerVersions"].should.equal([]) + assert result["LayerVersions"] == [] # Test create function with non existant layer version with pytest.raises((ValueError, ClientError)): @@ -159,11 +156,11 @@ def test_get_layer_version(): layer_version = resp["Version"] resp = conn.get_layer_version(LayerName=layer_name, VersionNumber=layer_version) - resp.should.have.key("Description").equals("") - resp.should.have.key("Version").equals(1) - resp.should.have.key("CompatibleArchitectures").equals(["x86_64"]) - resp.should.have.key("CompatibleRuntimes").equals(["python3.6"]) - resp.should.have.key("LicenseInfo").equals("MIT") + assert resp["Description"] == "" + assert resp["Version"] == 1 + assert resp["CompatibleArchitectures"] == ["x86_64"] + assert resp["CompatibleRuntimes"] == ["python3.6"] + assert resp["LicenseInfo"] == "MIT" @mock_lambda @@ -185,7 +182,7 @@ def test_get_layer_version__unknown(): with pytest.raises(ClientError) as exc: conn.get_layer_version(LayerName=layer_name, VersionNumber=1) err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") + assert err["Code"] == "ResourceNotFoundException" conn.publish_layer_version( LayerName=layer_name, @@ -198,7 +195,7 @@ def test_get_layer_version__unknown(): with pytest.raises(ClientError) as exc: conn.get_layer_version(LayerName=layer_name, VersionNumber=999) err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") + assert err["Code"] == "ResourceNotFoundException" @mock_lambda @@ -227,7 +224,7 @@ def test_delete_layer_version(): conn.delete_layer_version(LayerName=layer_name, VersionNumber=layer_version) result = conn.list_layer_versions(LayerName=layer_name)["LayerVersions"] - result.should.equal([]) + assert result == [] @mock_lambda diff --git a/tests/test_awslambda/test_lambda_policy.py b/tests/test_awslambda/test_lambda_policy.py index 971764f1e..39f3145d5 100644 --- a/tests/test_awslambda/test_lambda_policy.py +++ b/tests/test_awslambda/test_lambda_policy.py @@ -1,6 +1,5 @@ import boto3 import json -import sure # noqa # pylint: disable=unused-import import pytest from botocore.exceptions import ClientError @@ -41,13 +40,11 @@ def test_add_function_permission(key): assert "Statement" in response res = json.loads(response["Statement"]) assert res["Action"] == "lambda:InvokeFunction" - res["Condition"].should.equal( - { - "ArnLike": { - "AWS:SourceArn": "arn:aws:lambda:us-west-2:account-id:function:helloworld" - } + assert res["Condition"] == { + "ArnLike": { + "AWS:SourceArn": "arn:aws:lambda:us-west-2:account-id:function:helloworld" } - ) + } @mock_lambda @@ -75,11 +72,9 @@ def test_add_permission_with_principalorgid(): assert "Statement" in response res = json.loads(response["Statement"]) - res["Condition"].should.have.key("StringEquals").equals( - {"aws:PrincipalOrgID": "o-a1b2c3d4e5"} - ) - res["Condition"].should.have.key("ArnLike").equals({"AWS:SourceArn": source_arn}) - res.shouldnt.have.key("PrincipalOrgID") + assert res["Condition"]["StringEquals"] == {"aws:PrincipalOrgID": "o-a1b2c3d4e5"} + assert res["Condition"]["ArnLike"] == {"AWS:SourceArn": source_arn} + assert "PrincipalOrgID" not in res @pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"]) @@ -194,9 +189,10 @@ def test_add_permission_with_unknown_qualifier(): Qualifier="5", ) err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") - err["Message"].should.equal( - f"Function not found: arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:5" + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] + == f"Function not found: arn:aws:lambda:us-west-2:{ACCOUNT_ID}:function:{function_name}:5" ) @@ -224,10 +220,10 @@ def test_remove_function_permission(key): ) remove = conn.remove_permission(FunctionName=name_or_arn, StatementId="1") - remove["ResponseMetadata"]["HTTPStatusCode"].should.equal(204) + assert remove["ResponseMetadata"]["HTTPStatusCode"] == 204 policy = conn.get_policy(FunctionName=name_or_arn)["Policy"] policy = json.loads(policy) - policy["Statement"].should.equal([]) + assert policy["Statement"] == [] @pytest.mark.parametrize("key", ["FunctionName", "FunctionArn"]) @@ -269,10 +265,10 @@ def test_remove_function_permission__with_qualifier(key): remove = conn.remove_permission( FunctionName=name_or_arn, StatementId="1", Qualifier="2" ) - remove["ResponseMetadata"]["HTTPStatusCode"].should.equal(204) + assert remove["ResponseMetadata"]["HTTPStatusCode"] == 204 policy = conn.get_policy(FunctionName=name_or_arn, Qualifier="2")["Policy"] policy = json.loads(policy) - policy["Statement"].should.equal([]) + assert policy["Statement"] == [] @mock_lambda @@ -283,7 +279,8 @@ def test_get_unknown_policy(): with pytest.raises(ClientError) as exc: conn.get_policy(FunctionName="unknown") err = exc.value.response["Error"] - err["Code"].should.equal("ResourceNotFoundException") - err["Message"].should.equal( - "Function not found: arn:aws:lambda:us-west-2:123456789012:function:unknown" + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] + == "Function not found: arn:aws:lambda:us-west-2:123456789012:function:unknown" ) diff --git a/tests/test_awslambda/test_lambda_tags.py b/tests/test_awslambda/test_lambda_tags.py index fa6cbcaaf..7725e9609 100644 --- a/tests/test_awslambda/test_lambda_tags.py +++ b/tests/test_awslambda/test_lambda_tags.py @@ -1,7 +1,7 @@ -import botocore.client import boto3 -import sure # noqa # pylint: disable=unused-import +import pytest +from botocore.exceptions import ClientError from moto import mock_lambda, mock_s3 from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from uuid import uuid4 @@ -39,36 +39,31 @@ def test_tags(): ) # List tags when there are none - conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal(dict()) + assert conn.list_tags(Resource=function["FunctionArn"])["Tags"] == dict() # List tags when there is one - conn.tag_resource(Resource=function["FunctionArn"], Tags=dict(spam="eggs"))[ - "ResponseMetadata" - ]["HTTPStatusCode"].should.equal(200) - conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal( - dict(spam="eggs") - ) + resp = conn.tag_resource(Resource=function["FunctionArn"], Tags={"spam": "eggs"}) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert conn.list_tags(Resource=function["FunctionArn"])["Tags"] == {"spam": "eggs"} # List tags when another has been added - conn.tag_resource(Resource=function["FunctionArn"], Tags=dict(foo="bar"))[ - "ResponseMetadata" - ]["HTTPStatusCode"].should.equal(200) - conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal( - dict(spam="eggs", foo="bar") - ) + resp = conn.tag_resource(Resource=function["FunctionArn"], Tags=dict(foo="bar")) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 + assert conn.list_tags(Resource=function["FunctionArn"])["Tags"] == { + "spam": "eggs", + "foo": "bar", + } # Untag resource - conn.untag_resource(Resource=function["FunctionArn"], TagKeys=["spam", "trolls"])[ - "ResponseMetadata" - ]["HTTPStatusCode"].should.equal(204) - conn.list_tags(Resource=function["FunctionArn"])["Tags"].should.equal( - dict(foo="bar") + resp = conn.untag_resource( + Resource=function["FunctionArn"], TagKeys=["spam", "trolls"] ) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 204 + assert conn.list_tags(Resource=function["FunctionArn"])["Tags"] == {"foo": "bar"} # Untag a tag that does not exist (no error and no change) - conn.untag_resource(Resource=function["FunctionArn"], TagKeys=["spam"])[ - "ResponseMetadata" - ]["HTTPStatusCode"].should.equal(204) + resp = conn.untag_resource(Resource=function["FunctionArn"], TagKeys=["spam"]) + assert resp["ResponseMetadata"]["HTTPStatusCode"] == 204 @mock_lambda @@ -93,10 +88,10 @@ def test_create_function_with_tags(): ) tags = conn.list_tags(Resource=function["FunctionArn"])["Tags"] - tags.should.equal({"key1": "val1", "key2": "val2"}) + assert tags == {"key1": "val1", "key2": "val2"} result = conn.get_function(FunctionName=function_name) - result.should.have.key("Tags").equals({"key1": "val1", "key2": "val2"}) + assert result["Tags"] == {"key1": "val1", "key2": "val2"} @mock_lambda @@ -105,16 +100,17 @@ def test_tags_not_found(): Test list_tags and tag_resource when the lambda with the given arn does not exist """ conn = boto3.client("lambda", _lambda_region) - conn.list_tags.when.called_with( - Resource=f"arn:aws:lambda:{ACCOUNT_ID}:function:not-found" - ).should.throw(botocore.client.ClientError) + with pytest.raises(ClientError): + conn.list_tags(Resource=f"arn:aws:lambda:{ACCOUNT_ID}:function:not-found") - conn.tag_resource.when.called_with( - Resource=f"arn:aws:lambda:{ACCOUNT_ID}:function:not-found", - Tags=dict(spam="eggs"), - ).should.throw(botocore.client.ClientError) + with pytest.raises(ClientError): + conn.tag_resource( + Resource=f"arn:aws:lambda:{ACCOUNT_ID}:function:not-found", + Tags=dict(spam="eggs"), + ) - conn.untag_resource.when.called_with( - Resource=f"arn:aws:lambda:{ACCOUNT_ID}:function:not-found", - TagKeys=["spam"], - ).should.throw(botocore.client.ClientError) + with pytest.raises(ClientError): + conn.untag_resource( + Resource=f"arn:aws:lambda:{ACCOUNT_ID}:function:not-found", + TagKeys=["spam"], + ) diff --git a/tests/test_awslambda/test_policy.py b/tests/test_awslambda/test_policy.py index 0ad83659e..64b57f4fe 100644 --- a/tests/test_awslambda/test_policy.py +++ b/tests/test_awslambda/test_policy.py @@ -1,5 +1,4 @@ import json -import sure # noqa # pylint: disable=unused-import from moto.awslambda.policy import Policy @@ -37,11 +36,11 @@ def test_policy(): } policy.add_statement(json.dumps(statement)) - expected.should.be.equal(policy.statements[0]) + assert expected == policy.statements[0] sid = statement.get("StatementId", None) if sid is None: raise "TestCase.statement does not contain StatementId" policy.del_statement(sid) - [].should.be.equal(policy.statements) + assert [] == policy.statements