Replace sure with regular assertions in s3control (#6600)

This commit is contained in:
kbalk 2023-08-04 17:51:28 -04:00 committed by GitHub
parent 38827b29b8
commit fc53fd6950
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 119 deletions

View File

@ -1,6 +1,5 @@
import boto3 import boto3
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from boto3 import Session from boto3 import Session
from botocore.client import ClientError from botocore.client import ClientError
@ -14,32 +13,34 @@ def test_get_public_access_block_for_account():
client = boto3.client("s3control", region_name="us-west-2") client = boto3.client("s3control", region_name="us-west-2")
# With an invalid account ID: # With an invalid account ID:
with pytest.raises(ClientError) as ce: with pytest.raises(ClientError) as ce_err:
client.get_public_access_block(AccountId="111111111111") client.get_public_access_block(AccountId="111111111111")
assert ce.value.response["Error"]["Code"] == "AccessDenied" assert ce_err.value.response["Error"]["Code"] == "AccessDenied"
# Without one defined: # Without one defined:
with pytest.raises(ClientError) as ce: with pytest.raises(ClientError) as ce_err:
client.get_public_access_block(AccountId=ACCOUNT_ID) client.get_public_access_block(AccountId=ACCOUNT_ID)
assert ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration" assert (
ce_err.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)
# Put a with an invalid account ID: # Put a with an invalid account ID:
with pytest.raises(ClientError) as ce: with pytest.raises(ClientError) as ce_err:
client.put_public_access_block( client.put_public_access_block(
AccountId="111111111111", AccountId="111111111111",
PublicAccessBlockConfiguration={"BlockPublicAcls": True}, PublicAccessBlockConfiguration={"BlockPublicAcls": True},
) )
assert ce.value.response["Error"]["Code"] == "AccessDenied" assert ce_err.value.response["Error"]["Code"] == "AccessDenied"
# Put with an invalid PAB: # Put with an invalid PAB:
with pytest.raises(ClientError) as ce: with pytest.raises(ClientError) as ce_err:
client.put_public_access_block( client.put_public_access_block(
AccountId=ACCOUNT_ID, PublicAccessBlockConfiguration={} AccountId=ACCOUNT_ID, PublicAccessBlockConfiguration={}
) )
assert ce.value.response["Error"]["Code"] == "InvalidRequest" assert ce_err.value.response["Error"]["Code"] == "InvalidRequest"
assert ( assert (
"Must specify at least one configuration." "Must specify at least one configuration."
in ce.value.response["Error"]["Message"] in ce_err.value.response["Error"]["Message"]
) )
# Correct PAB: # Correct PAB:
@ -66,14 +67,16 @@ def test_get_public_access_block_for_account():
} }
# Delete with an invalid account ID: # Delete with an invalid account ID:
with pytest.raises(ClientError) as ce: with pytest.raises(ClientError) as ce_err:
client.delete_public_access_block(AccountId="111111111111") client.delete_public_access_block(AccountId="111111111111")
assert ce.value.response["Error"]["Code"] == "AccessDenied" assert ce_err.value.response["Error"]["Code"] == "AccessDenied"
# Delete successfully: # Delete successfully:
client.delete_public_access_block(AccountId=ACCOUNT_ID) client.delete_public_access_block(AccountId=ACCOUNT_ID)
# Confirm that it's deleted: # Confirm that it's deleted:
with pytest.raises(ClientError) as ce: with pytest.raises(ClientError) as ce_err:
client.get_public_access_block(AccountId=ACCOUNT_ID) client.get_public_access_block(AccountId=ACCOUNT_ID)
assert ce.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration" assert (
ce_err.value.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration"
)

View File

@ -1,6 +1,7 @@
import re
import boto3 import boto3
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.client import ClientError from botocore.client import ClientError
from moto import mock_s3control from moto import mock_s3control
@ -13,8 +14,8 @@ def test_create_access_point():
AccountId="111111111111", Name="ap_name", Bucket="mybucket" AccountId="111111111111", Name="ap_name", Bucket="mybucket"
) )
resp.should.have.key("AccessPointArn") assert "AccessPointArn" in resp
resp.should.have.key("Alias").equals("ap_name") assert resp["Alias"] == "ap_name"
@mock_s3control @mock_s3control
@ -24,9 +25,9 @@ def test_get_unknown_access_point():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_access_point(AccountId="111111111111", Name="ap_name") client.get_access_point(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPoint") assert err["Code"] == "NoSuchAccessPoint"
err["Message"].should.equal("The specified accesspoint does not exist") assert err["Message"] == "The specified accesspoint does not exist"
err["AccessPointName"].should.equal("ap_name") assert err["AccessPointName"] == "ap_name"
@mock_s3control @mock_s3control
@ -38,34 +39,29 @@ def test_get_access_point_minimal():
resp = client.get_access_point(AccountId="111111111111", Name="ap_name") resp = client.get_access_point(AccountId="111111111111", Name="ap_name")
resp.should.have.key("Name").equals("ap_name") assert resp["Name"] == "ap_name"
resp.should.have.key("Bucket").equals("mybucket") assert resp["Bucket"] == "mybucket"
resp.should.have.key("NetworkOrigin").equals("Internet") assert resp["NetworkOrigin"] == "Internet"
resp.should.have.key("PublicAccessBlockConfiguration").equals( assert resp["PublicAccessBlockConfiguration"] == {
{
"BlockPublicAcls": True, "BlockPublicAcls": True,
"IgnorePublicAcls": True, "IgnorePublicAcls": True,
"BlockPublicPolicy": True, "BlockPublicPolicy": True,
"RestrictPublicBuckets": True, "RestrictPublicBuckets": True,
} }
) assert "CreationDate" in resp
resp.should.have.key("CreationDate") assert "Alias" in resp
resp.should.have.key("Alias").match("ap_name-[a-z0-9]+-s3alias") assert re.match("ap_name-[a-z0-9]+-s3alias", resp["Alias"])
resp.should.have.key("AccessPointArn").equals( assert resp["AccessPointArn"] == (
"arn:aws:s3:us-east-1:111111111111:accesspoint/ap_name" "arn:aws:s3:us-east-1:111111111111:accesspoint/ap_name"
) )
resp.should.have.key("Endpoints") assert "Endpoints" in resp
resp["Endpoints"].should.have.key("ipv4").equals( assert resp["Endpoints"]["ipv4"] == "s3-accesspoint.us-east-1.amazonaws.com"
"s3-accesspoint.us-east-1.amazonaws.com" assert resp["Endpoints"]["fips"] == "s3-accesspoint-fips.us-east-1.amazonaws.com"
) assert resp["Endpoints"]["fips_dualstack"] == (
resp["Endpoints"].should.have.key("fips").equals(
"s3-accesspoint-fips.us-east-1.amazonaws.com"
)
resp["Endpoints"].should.have.key("fips_dualstack").equals(
"s3-accesspoint-fips.dualstack.us-east-1.amazonaws.com" "s3-accesspoint-fips.dualstack.us-east-1.amazonaws.com"
) )
resp["Endpoints"].should.have.key("dualstack").equals( assert resp["Endpoints"]["dualstack"] == (
"s3-accesspoint.dualstack.us-east-1.amazonaws.com" "s3-accesspoint.dualstack.us-east-1.amazonaws.com"
) )
@ -88,18 +84,16 @@ def test_get_access_point_full():
resp = client.get_access_point(AccountId="111111111111", Name="ap_name") resp = client.get_access_point(AccountId="111111111111", Name="ap_name")
resp.should.have.key("Name").equals("ap_name") assert resp["Name"] == "ap_name"
resp.should.have.key("Bucket").equals("mybucket") assert resp["Bucket"] == "mybucket"
resp.should.have.key("NetworkOrigin").equals("VPC") assert resp["NetworkOrigin"] == "VPC"
resp.should.have.key("VpcConfiguration").equals({"VpcId": "sth"}) assert resp["VpcConfiguration"] == {"VpcId": "sth"}
resp.should.have.key("PublicAccessBlockConfiguration").equals( assert resp["PublicAccessBlockConfiguration"] == {
{
"BlockPublicAcls": False, "BlockPublicAcls": False,
"IgnorePublicAcls": False, "IgnorePublicAcls": False,
"BlockPublicPolicy": False, "BlockPublicPolicy": False,
"RestrictPublicBuckets": False, "RestrictPublicBuckets": False,
} }
)
@mock_s3control @mock_s3control
@ -115,4 +109,4 @@ def test_delete_access_point():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_access_point(AccountId="111111111111", Name="ap_name") client.get_access_point(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPoint") assert err["Code"] == "NoSuchAccessPoint"

View File

@ -1,6 +1,5 @@
import boto3 import boto3
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
from botocore.client import ClientError from botocore.client import ClientError
from moto import mock_s3control from moto import mock_s3control
@ -32,7 +31,8 @@ def test_get_access_point_policy():
) )
resp = client.get_access_point_policy(AccountId="111111111111", Name="ap_name") resp = client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
resp.should.have.key("Policy").equals(policy) assert "Policy" in resp
assert resp["Policy"] == policy
@mock_s3control @mock_s3control
@ -45,9 +45,9 @@ def test_get_unknown_access_point_policy():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_access_point_policy(AccountId="111111111111", Name="ap_name") client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPointPolicy") assert err["Code"] == "NoSuchAccessPointPolicy"
err["Message"].should.equal("The specified accesspoint policy does not exist") assert err["Message"] == "The specified accesspoint policy does not exist"
err["AccessPointName"].should.equal("ap_name") assert err["AccessPointName"] == "ap_name"
@mock_s3control @mock_s3control
@ -78,7 +78,8 @@ def test_get_access_point_policy_status():
resp = client.get_access_point_policy_status( resp = client.get_access_point_policy_status(
AccountId="111111111111", Name="ap_name" AccountId="111111111111", Name="ap_name"
) )
resp.should.have.key("PolicyStatus").equals({"IsPublic": True}) assert "PolicyStatus" in resp
assert resp["PolicyStatus"] == {"IsPublic": True}
@mock_s3control @mock_s3control
@ -98,7 +99,7 @@ def test_delete_access_point_policy():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_access_point_policy(AccountId="111111111111", Name="ap_name") client.get_access_point_policy(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPointPolicy") assert err["Code"] == "NoSuchAccessPointPolicy"
@mock_s3control @mock_s3control
@ -111,6 +112,6 @@ def test_get_unknown_access_point_policy_status():
with pytest.raises(ClientError) as exc: with pytest.raises(ClientError) as exc:
client.get_access_point_policy_status(AccountId="111111111111", Name="ap_name") client.get_access_point_policy_status(AccountId="111111111111", Name="ap_name")
err = exc.value.response["Error"] err = exc.value.response["Error"]
err["Code"].should.equal("NoSuchAccessPointPolicy") assert err["Code"] == "NoSuchAccessPointPolicy"
err["Message"].should.equal("The specified accesspoint policy does not exist") assert err["Message"] == "The specified accesspoint policy does not exist"
err["AccessPointName"].should.equal("ap_name") assert err["AccessPointName"] == "ap_name"

View File

@ -1,8 +1,7 @@
import boto3
import json import json
import pytest import pytest
import sure # noqa # pylint: disable=unused-import
import boto3
from boto3 import Session from boto3 import Session
from botocore.client import ClientError from botocore.client import ClientError
from moto import settings, mock_s3control, mock_config from moto import settings, mock_s3control, mock_config
@ -90,17 +89,17 @@ if not settings.TEST_SERVER_MODE:
ResourceType="AWS::S3::AccountPublicAccessBlock", ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing", ConfigurationAggregatorName="testing",
) )
regions = {region for region in Session().get_available_regions("config")} regions = set(Session().get_available_regions("config"))
for r in result["ResourceIdentifiers"]: for resource in result["ResourceIdentifiers"]:
regions.remove(r.pop("SourceRegion")) regions.remove(resource.pop("SourceRegion"))
assert r == { assert resource == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock", "ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID, "SourceAccountId": ACCOUNT_ID,
"ResourceId": ACCOUNT_ID, "ResourceId": ACCOUNT_ID,
} }
# Just check that the len is the same -- this should be reasonable # Just check that the len is the same -- this should be reasonable
regions = {region for region in Session().get_available_regions("config")} regions = set(Session().get_available_regions("config"))
result = config_client.list_aggregate_discovered_resources( result = config_client.list_aggregate_discovered_resources(
ResourceType="AWS::S3::AccountPublicAccessBlock", ResourceType="AWS::S3::AccountPublicAccessBlock",
ConfigurationAggregatorName="testing", ConfigurationAggregatorName="testing",
@ -133,9 +132,7 @@ if not settings.TEST_SERVER_MODE:
ConfigurationAggregatorName="testing", ConfigurationAggregatorName="testing",
Limit=1, Limit=1,
) )
regions = sorted( regions = sorted(set(Session().get_available_regions("config")))
[region for region in Session().get_available_regions("config")]
)
assert result["ResourceIdentifiers"][0] == { assert result["ResourceIdentifiers"][0] == {
"ResourceType": "AWS::S3::AccountPublicAccessBlock", "ResourceType": "AWS::S3::AccountPublicAccessBlock",
"SourceAccountId": ACCOUNT_ID, "SourceAccountId": ACCOUNT_ID,
@ -207,11 +204,13 @@ if not settings.TEST_SERVER_MODE:
) )
# Without a PAB in place: # Without a PAB in place:
with pytest.raises(ClientError) as ce: with pytest.raises(ClientError) as ce_err:
config_client.get_resource_config_history( config_client.get_resource_config_history(
resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID resourceType="AWS::S3::AccountPublicAccessBlock", resourceId=ACCOUNT_ID
) )
assert ce.value.response["Error"]["Code"] == "ResourceNotDiscoveredException" assert (
ce_err.value.response["Error"]["Code"] == "ResourceNotDiscoveredException"
)
# aggregate # aggregate
result = config_client.batch_get_resource_config( result = config_client.batch_get_resource_config(
resourceKeys=[ resourceKeys=[

View File

@ -1,6 +1,5 @@
"""Test that using both s3 and s3control do not interfere""" """Test that using both s3 and s3control do not interfere"""
import boto3 import boto3
import sure # noqa # pylint: disable=unused-import
from moto import mock_s3, mock_s3control, settings from moto import mock_s3, mock_s3control, settings
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
@ -12,8 +11,8 @@ if not settings.TEST_SERVER_MODE:
@mock_s3control @mock_s3control
def test_pab_are_kept_separate(): def test_pab_are_kept_separate():
client = boto3.client("s3control", region_name="us-east-1") client = boto3.client("s3control", region_name="us-east-1")
s3 = boto3.client("s3", region_name="us-east-1") s3_client = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="bucket") s3_client.create_bucket(Bucket="bucket")
client.put_public_access_block( client.put_public_access_block(
AccountId=ACCOUNT_ID, AccountId=ACCOUNT_ID,
@ -25,7 +24,7 @@ if not settings.TEST_SERVER_MODE:
}, },
) )
s3.put_public_access_block( s3_client.put_public_access_block(
Bucket="bucket", Bucket="bucket",
PublicAccessBlockConfiguration={ PublicAccessBlockConfiguration={
"BlockPublicAcls": True, "BlockPublicAcls": True,
@ -36,31 +35,27 @@ if not settings.TEST_SERVER_MODE:
) )
pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID) pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID)
pab_from_control.should.have.key("PublicAccessBlockConfiguration").equals( assert pab_from_control["PublicAccessBlockConfiguration"] == {
{
"BlockPublicAcls": True, "BlockPublicAcls": True,
"IgnorePublicAcls": True, "IgnorePublicAcls": True,
"BlockPublicPolicy": True, "BlockPublicPolicy": True,
"RestrictPublicBuckets": True, "RestrictPublicBuckets": True,
} }
)
pab_from_s3 = s3.get_public_access_block(Bucket="bucket") pab_from_s3 = s3_client.get_public_access_block(Bucket="bucket")
pab_from_s3.should.have.key("PublicAccessBlockConfiguration").equals( assert pab_from_s3["PublicAccessBlockConfiguration"] == {
{
"BlockPublicAcls": True, "BlockPublicAcls": True,
"IgnorePublicAcls": False, "IgnorePublicAcls": False,
"BlockPublicPolicy": True, "BlockPublicPolicy": True,
"RestrictPublicBuckets": False, "RestrictPublicBuckets": False,
} }
)
@mock_s3control @mock_s3control
@mock_s3 @mock_s3
def test_pab_are_kept_separate_with_inverse_mocks(): def test_pab_are_kept_separate_with_inverse_mocks():
client = boto3.client("s3control", region_name="us-east-1") client = boto3.client("s3control", region_name="us-east-1")
s3 = boto3.client("s3", region_name="us-east-1") s3_client = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket="bucket") s3_client.create_bucket(Bucket="bucket")
client.put_public_access_block( client.put_public_access_block(
AccountId=ACCOUNT_ID, AccountId=ACCOUNT_ID,
@ -72,7 +67,7 @@ if not settings.TEST_SERVER_MODE:
}, },
) )
s3.put_public_access_block( s3_client.put_public_access_block(
Bucket="bucket", Bucket="bucket",
PublicAccessBlockConfiguration={ PublicAccessBlockConfiguration={
"BlockPublicAcls": True, "BlockPublicAcls": True,
@ -83,21 +78,17 @@ if not settings.TEST_SERVER_MODE:
) )
pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID) pab_from_control = client.get_public_access_block(AccountId=ACCOUNT_ID)
pab_from_control.should.have.key("PublicAccessBlockConfiguration").equals( assert pab_from_control["PublicAccessBlockConfiguration"] == {
{
"BlockPublicAcls": True, "BlockPublicAcls": True,
"IgnorePublicAcls": True, "IgnorePublicAcls": True,
"BlockPublicPolicy": True, "BlockPublicPolicy": True,
"RestrictPublicBuckets": True, "RestrictPublicBuckets": True,
} }
)
pab_from_s3 = s3.get_public_access_block(Bucket="bucket") pab_from_s3 = s3_client.get_public_access_block(Bucket="bucket")
pab_from_s3.should.have.key("PublicAccessBlockConfiguration").equals( assert pab_from_s3["PublicAccessBlockConfiguration"] == {
{
"BlockPublicAcls": True, "BlockPublicAcls": True,
"IgnorePublicAcls": False, "IgnorePublicAcls": False,
"BlockPublicPolicy": True, "BlockPublicPolicy": True,
"RestrictPublicBuckets": False, "RestrictPublicBuckets": False,
} }
)