diff --git a/tests/test_s3control/test_s3control.py b/tests/test_s3control/test_s3control.py index 06f231635..fbf925c21 100644 --- a/tests/test_s3control/test_s3control.py +++ b/tests/test_s3control/test_s3control.py @@ -1,6 +1,5 @@ import boto3 import pytest -import sure # noqa # pylint: disable=unused-import from boto3 import Session from botocore.client import ClientError @@ -14,32 +13,34 @@ def test_get_public_access_block_for_account(): client = boto3.client("s3control", region_name="us-west-2") # With an invalid account ID: - with pytest.raises(ClientError) as ce: + with pytest.raises(ClientError) as ce_err: client.get_public_access_block(AccountId="111111111111") - assert ce.value.response["Error"]["Code"] == "AccessDenied" + assert ce_err.value.response["Error"]["Code"] == "AccessDenied" # Without one defined: - with pytest.raises(ClientError) as ce: + with pytest.raises(ClientError) as ce_err: client.get_public_access_block(AccountId=ACCOUNT_ID) - assert ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration" + assert ( + ce_err.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration" + ) # Put a with an invalid account ID: - with pytest.raises(ClientError) as ce: + with pytest.raises(ClientError) as ce_err: client.put_public_access_block( AccountId="111111111111", PublicAccessBlockConfiguration={"BlockPublicAcls": True}, ) - assert ce.value.response["Error"]["Code"] == "AccessDenied" + assert ce_err.value.response["Error"]["Code"] == "AccessDenied" # Put with an invalid PAB: - with pytest.raises(ClientError) as ce: + with pytest.raises(ClientError) as ce_err: client.put_public_access_block( AccountId=ACCOUNT_ID, PublicAccessBlockConfiguration={} ) - assert ce.value.response["Error"]["Code"] == "InvalidRequest" + assert ce_err.value.response["Error"]["Code"] == "InvalidRequest" assert ( "Must specify at least one configuration." - in ce.value.response["Error"]["Message"] + in ce_err.value.response["Error"]["Message"] ) # Correct PAB: @@ -66,14 +67,16 @@ def test_get_public_access_block_for_account(): } # Delete with an invalid account ID: - with pytest.raises(ClientError) as ce: + with pytest.raises(ClientError) as ce_err: client.delete_public_access_block(AccountId="111111111111") - assert ce.value.response["Error"]["Code"] == "AccessDenied" + assert ce_err.value.response["Error"]["Code"] == "AccessDenied" # Delete successfully: client.delete_public_access_block(AccountId=ACCOUNT_ID) # Confirm that it's deleted: - with pytest.raises(ClientError) as ce: + with pytest.raises(ClientError) as ce_err: client.get_public_access_block(AccountId=ACCOUNT_ID) - assert ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration" + assert ( + ce_err.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration" + ) diff --git a/tests/test_s3control/test_s3control_access_points.py b/tests/test_s3control/test_s3control_access_points.py index e92d991f2..7e7bc0993 100644 --- a/tests/test_s3control/test_s3control_access_points.py +++ b/tests/test_s3control/test_s3control_access_points.py @@ -1,6 +1,7 @@ +import re + import boto3 import pytest -import sure # noqa # pylint: disable=unused-import from botocore.client import ClientError from moto import mock_s3control @@ -13,8 +14,8 @@ def test_create_access_point(): AccountId="111111111111", Name="ap_name", Bucket="mybucket" ) - resp.should.have.key("AccessPointArn") - resp.should.have.key("Alias").equals("ap_name") + assert "AccessPointArn" in resp + assert resp["Alias"] == "ap_name" @mock_s3control @@ -24,9 +25,9 @@ def test_get_unknown_access_point(): with pytest.raises(ClientError) as exc: client.get_access_point(AccountId="111111111111", Name="ap_name") err = exc.value.response["Error"] - err["Code"].should.equal("NoSuchAccessPoint") - err["Message"].should.equal("The specified accesspoint does not exist") - err["AccessPointName"].should.equal("ap_name") + assert err["Code"] == "NoSuchAccessPoint" + assert err["Message"] == "The specified accesspoint does not exist" + assert err["AccessPointName"] == "ap_name" @mock_s3control @@ -38,34 +39,29 @@ def test_get_access_point_minimal(): resp = client.get_access_point(AccountId="111111111111", Name="ap_name") - resp.should.have.key("Name").equals("ap_name") - resp.should.have.key("Bucket").equals("mybucket") - resp.should.have.key("NetworkOrigin").equals("Internet") - resp.should.have.key("PublicAccessBlockConfiguration").equals( - { - "BlockPublicAcls": True, - "IgnorePublicAcls": True, - "BlockPublicPolicy": True, - "RestrictPublicBuckets": True, - } - ) - resp.should.have.key("CreationDate") - resp.should.have.key("Alias").match("ap_name-[a-z0-9]+-s3alias") - resp.should.have.key("AccessPointArn").equals( + assert resp["Name"] == "ap_name" + assert resp["Bucket"] == "mybucket" + assert resp["NetworkOrigin"] == "Internet" + assert resp["PublicAccessBlockConfiguration"] == { + "BlockPublicAcls": True, + "IgnorePublicAcls": True, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": True, + } + assert "CreationDate" in resp + assert "Alias" in resp + assert re.match("ap_name-[a-z0-9]+-s3alias", resp["Alias"]) + assert resp["AccessPointArn"] == ( "arn:aws:s3:us-east-1:111111111111:accesspoint/ap_name" ) - resp.should.have.key("Endpoints") + assert "Endpoints" in resp - resp["Endpoints"].should.have.key("ipv4").equals( - "s3-accesspoint.us-east-1.amazonaws.com" - ) - resp["Endpoints"].should.have.key("fips").equals( - "s3-accesspoint-fips.us-east-1.amazonaws.com" - ) - resp["Endpoints"].should.have.key("fips_dualstack").equals( + assert resp["Endpoints"]["ipv4"] == "s3-accesspoint.us-east-1.amazonaws.com" + assert resp["Endpoints"]["fips"] == "s3-accesspoint-fips.us-east-1.amazonaws.com" + assert resp["Endpoints"]["fips_dualstack"] == ( "s3-accesspoint-fips.dualstack.us-east-1.amazonaws.com" ) - resp["Endpoints"].should.have.key("dualstack").equals( + assert resp["Endpoints"]["dualstack"] == ( "s3-accesspoint.dualstack.us-east-1.amazonaws.com" ) @@ -88,18 +84,16 @@ def test_get_access_point_full(): resp = client.get_access_point(AccountId="111111111111", Name="ap_name") - resp.should.have.key("Name").equals("ap_name") - resp.should.have.key("Bucket").equals("mybucket") - resp.should.have.key("NetworkOrigin").equals("VPC") - resp.should.have.key("VpcConfiguration").equals({"VpcId": "sth"}) - resp.should.have.key("PublicAccessBlockConfiguration").equals( - { - "BlockPublicAcls": False, - "IgnorePublicAcls": False, - "BlockPublicPolicy": False, - "RestrictPublicBuckets": False, - } - ) + assert resp["Name"] == "ap_name" + assert resp["Bucket"] == "mybucket" + assert resp["NetworkOrigin"] == "VPC" + assert resp["VpcConfiguration"] == {"VpcId": "sth"} + assert resp["PublicAccessBlockConfiguration"] == { + "BlockPublicAcls": False, + "IgnorePublicAcls": False, + "BlockPublicPolicy": False, + "RestrictPublicBuckets": False, + } @mock_s3control @@ -115,4 +109,4 @@ def test_delete_access_point(): with pytest.raises(ClientError) as exc: client.get_access_point(AccountId="111111111111", Name="ap_name") err = exc.value.response["Error"] - err["Code"].should.equal("NoSuchAccessPoint") + assert err["Code"] == "NoSuchAccessPoint" diff --git a/tests/test_s3control/test_s3control_accesspoint_policy.py b/tests/test_s3control/test_s3control_accesspoint_policy.py index b6a2589a5..a2e6e7f36 100644 --- a/tests/test_s3control/test_s3control_accesspoint_policy.py +++ b/tests/test_s3control/test_s3control_accesspoint_policy.py @@ -1,6 +1,5 @@ import boto3 import pytest -import sure # noqa # pylint: disable=unused-import from botocore.client import ClientError from moto import mock_s3control @@ -32,7 +31,8 @@ def test_get_access_point_policy(): ) resp = client.get_access_point_policy(AccountId="111111111111", Name="ap_name") - resp.should.have.key("Policy").equals(policy) + assert "Policy" in resp + assert resp["Policy"] == policy @mock_s3control @@ -45,9 +45,9 @@ def test_get_unknown_access_point_policy(): with pytest.raises(ClientError) as exc: client.get_access_point_policy(AccountId="111111111111", Name="ap_name") err = exc.value.response["Error"] - err["Code"].should.equal("NoSuchAccessPointPolicy") - err["Message"].should.equal("The specified accesspoint policy does not exist") - err["AccessPointName"].should.equal("ap_name") + assert err["Code"] == "NoSuchAccessPointPolicy" + assert err["Message"] == "The specified accesspoint policy does not exist" + assert err["AccessPointName"] == "ap_name" @mock_s3control @@ -78,7 +78,8 @@ def test_get_access_point_policy_status(): resp = client.get_access_point_policy_status( AccountId="111111111111", Name="ap_name" ) - resp.should.have.key("PolicyStatus").equals({"IsPublic": True}) + assert "PolicyStatus" in resp + assert resp["PolicyStatus"] == {"IsPublic": True} @mock_s3control @@ -98,7 +99,7 @@ def test_delete_access_point_policy(): with pytest.raises(ClientError) as exc: client.get_access_point_policy(AccountId="111111111111", Name="ap_name") err = exc.value.response["Error"] - err["Code"].should.equal("NoSuchAccessPointPolicy") + assert err["Code"] == "NoSuchAccessPointPolicy" @mock_s3control @@ -111,6 +112,6 @@ def test_get_unknown_access_point_policy_status(): with pytest.raises(ClientError) as exc: client.get_access_point_policy_status(AccountId="111111111111", Name="ap_name") err = exc.value.response["Error"] - err["Code"].should.equal("NoSuchAccessPointPolicy") - err["Message"].should.equal("The specified accesspoint policy does not exist") - err["AccessPointName"].should.equal("ap_name") + assert err["Code"] == "NoSuchAccessPointPolicy" + assert err["Message"] == "The specified accesspoint policy does not exist" + assert err["AccessPointName"] == "ap_name" diff --git a/tests/test_s3control/test_s3control_config_integration.py b/tests/test_s3control/test_s3control_config_integration.py index 87d45b526..ec5ce3cb0 100644 --- a/tests/test_s3control/test_s3control_config_integration.py +++ b/tests/test_s3control/test_s3control_config_integration.py @@ -1,8 +1,7 @@ -import boto3 import json import pytest -import sure # noqa # pylint: disable=unused-import +import boto3 from boto3 import Session from botocore.client import ClientError from moto import settings, mock_s3control, mock_config @@ -90,17 +89,17 @@ if not settings.TEST_SERVER_MODE: ResourceType="AWS::S3::AccountPublicAccessBlock", ConfigurationAggregatorName="testing", ) - regions = {region for region in Session().get_available_regions("config")} - for r in result["ResourceIdentifiers"]: - regions.remove(r.pop("SourceRegion")) - assert r == { + regions = set(Session().get_available_regions("config")) + for resource in result["ResourceIdentifiers"]: + regions.remove(resource.pop("SourceRegion")) + assert resource == { "ResourceType": "AWS::S3::AccountPublicAccessBlock", "SourceAccountId": ACCOUNT_ID, "ResourceId": ACCOUNT_ID, } # Just check that the len is the same -- this should be reasonable - regions = {region for region in Session().get_available_regions("config")} + regions = set(Session().get_available_regions("config")) result = config_client.list_aggregate_discovered_resources( ResourceType="AWS::S3::AccountPublicAccessBlock", ConfigurationAggregatorName="testing", @@ -133,9 +132,7 @@ if not settings.TEST_SERVER_MODE: ConfigurationAggregatorName="testing", Limit=1, ) - regions = sorted( - [region for region in Session().get_available_regions("config")] - ) + regions = sorted(set(Session().get_available_regions("config"))) assert result["ResourceIdentifiers"][0] == { "ResourceType": "AWS::S3::AccountPublicAccessBlock", "SourceAccountId": ACCOUNT_ID, @@ -207,11 +204,13 @@ if not settings.TEST_SERVER_MODE: ) # Without a PAB in place: - with pytest.raises(ClientError) as ce: + with pytest.raises(ClientError) as ce_err: config_client.get_resource_config_history( resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID ) - assert ce.value.response["Error"]["Code"] == "ResourceNotDiscoveredException" + assert ( + ce_err.value.response["Error"]["Code"] == "ResourceNotDiscoveredException" + ) # aggregate result = config_client.batch_get_resource_config( resourceKeys=[ diff --git a/tests/test_s3control/test_s3control_s3.py b/tests/test_s3control/test_s3control_s3.py index 7547bfef5..7947496ac 100644 --- a/tests/test_s3control/test_s3control_s3.py +++ b/tests/test_s3control/test_s3control_s3.py @@ -1,6 +1,5 @@ """Test that using both s3 and s3control do not interfere""" import boto3 -import sure # noqa # pylint: disable=unused-import from moto import mock_s3, mock_s3control, settings from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID @@ -12,8 +11,8 @@ if not settings.TEST_SERVER_MODE: @mock_s3control def test_pab_are_kept_separate(): client = boto3.client("s3control", region_name="us-east-1") - s3 = boto3.client("s3", region_name="us-east-1") - s3.create_bucket(Bucket="bucket") + s3_client = boto3.client("s3", region_name="us-east-1") + s3_client.create_bucket(Bucket="bucket") client.put_public_access_block( AccountId=ACCOUNT_ID, @@ -25,7 +24,7 @@ if not settings.TEST_SERVER_MODE: }, ) - s3.put_public_access_block( + s3_client.put_public_access_block( Bucket="bucket", PublicAccessBlockConfiguration={ "BlockPublicAcls": True, @@ -36,31 +35,27 @@ if not settings.TEST_SERVER_MODE: ) pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID) - pab_from_control.should.have.key("PublicAccessBlockConfiguration").equals( - { - "BlockPublicAcls": True, - "IgnorePublicAcls": True, - "BlockPublicPolicy": True, - "RestrictPublicBuckets": True, - } - ) + assert pab_from_control["PublicAccessBlockConfiguration"] == { + "BlockPublicAcls": True, + "IgnorePublicAcls": True, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": True, + } - pab_from_s3 = s3.get_public_access_block(Bucket="bucket") - pab_from_s3.should.have.key("PublicAccessBlockConfiguration").equals( - { - "BlockPublicAcls": True, - "IgnorePublicAcls": False, - "BlockPublicPolicy": True, - "RestrictPublicBuckets": False, - } - ) + pab_from_s3 = s3_client.get_public_access_block(Bucket="bucket") + assert pab_from_s3["PublicAccessBlockConfiguration"] == { + "BlockPublicAcls": True, + "IgnorePublicAcls": False, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": False, + } @mock_s3control @mock_s3 def test_pab_are_kept_separate_with_inverse_mocks(): client = boto3.client("s3control", region_name="us-east-1") - s3 = boto3.client("s3", region_name="us-east-1") - s3.create_bucket(Bucket="bucket") + s3_client = boto3.client("s3", region_name="us-east-1") + s3_client.create_bucket(Bucket="bucket") client.put_public_access_block( AccountId=ACCOUNT_ID, @@ -72,7 +67,7 @@ if not settings.TEST_SERVER_MODE: }, ) - s3.put_public_access_block( + s3_client.put_public_access_block( Bucket="bucket", PublicAccessBlockConfiguration={ "BlockPublicAcls": True, @@ -83,21 +78,17 @@ if not settings.TEST_SERVER_MODE: ) pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID) - pab_from_control.should.have.key("PublicAccessBlockConfiguration").equals( - { - "BlockPublicAcls": True, - "IgnorePublicAcls": True, - "BlockPublicPolicy": True, - "RestrictPublicBuckets": True, - } - ) + assert pab_from_control["PublicAccessBlockConfiguration"] == { + "BlockPublicAcls": True, + "IgnorePublicAcls": True, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": True, + } - pab_from_s3 = s3.get_public_access_block(Bucket="bucket") - pab_from_s3.should.have.key("PublicAccessBlockConfiguration").equals( - { - "BlockPublicAcls": True, - "IgnorePublicAcls": False, - "BlockPublicPolicy": True, - "RestrictPublicBuckets": False, - } - ) + pab_from_s3 = s3_client.get_public_access_block(Bucket="bucket") + assert pab_from_s3["PublicAccessBlockConfiguration"] == { + "BlockPublicAcls": True, + "IgnorePublicAcls": False, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": False, + }