2023-08-07 16:48:48 +00:00
|
|
|
from uuid import uuid4
|
|
|
|
|
2021-11-06 23:12:01 +00:00
|
|
|
import boto3
|
|
|
|
import pytest
|
2023-11-30 15:55:51 +00:00
|
|
|
from botocore.exceptions import ClientError
|
2021-11-06 23:12:01 +00:00
|
|
|
|
2024-01-07 12:03:33 +00:00
|
|
|
from moto import mock_aws
|
2021-11-06 23:12:01 +00:00
|
|
|
|
|
|
|
|
2024-01-07 12:03:33 +00:00
|
|
|
@mock_aws
|
2021-11-06 23:12:01 +00:00
|
|
|
def test_encryption_on_new_bucket_fails():
|
|
|
|
conn = boto3.client("s3", region_name="us-east-1")
|
|
|
|
conn.create_bucket(Bucket="mybucket")
|
|
|
|
|
|
|
|
with pytest.raises(ClientError) as exc:
|
|
|
|
conn.get_bucket_encryption(Bucket="mybucket")
|
|
|
|
err = exc.value.response["Error"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert err["Code"] == "ServerSideEncryptionConfigurationNotFoundError"
|
|
|
|
assert err["Message"] == "The server side encryption configuration was not found"
|
|
|
|
assert err["BucketName"] == "mybucket"
|
2021-11-06 23:12:01 +00:00
|
|
|
|
|
|
|
|
2024-01-07 12:03:33 +00:00
|
|
|
@mock_aws
|
2021-11-06 23:12:01 +00:00
|
|
|
def test_put_and_get_encryption():
|
|
|
|
# Create Bucket so that test can run
|
|
|
|
conn = boto3.client("s3", region_name="us-east-1")
|
|
|
|
conn.create_bucket(Bucket="mybucket")
|
|
|
|
|
|
|
|
sse_config = {
|
|
|
|
"Rules": [
|
|
|
|
{
|
|
|
|
"ApplyServerSideEncryptionByDefault": {
|
|
|
|
"SSEAlgorithm": "aws:kms",
|
|
|
|
"KMSMasterKeyID": "12345678",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}
|
|
|
|
|
|
|
|
conn.put_bucket_encryption(
|
|
|
|
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
|
|
|
|
)
|
|
|
|
|
|
|
|
resp = conn.get_bucket_encryption(Bucket="mybucket")
|
|
|
|
assert "ServerSideEncryptionConfiguration" in resp
|
|
|
|
return_config = sse_config.copy()
|
|
|
|
return_config["Rules"][0]["BucketKeyEnabled"] = False
|
2023-08-07 16:48:48 +00:00
|
|
|
assert resp["ServerSideEncryptionConfiguration"] == return_config
|
2021-11-06 23:12:01 +00:00
|
|
|
|
|
|
|
|
2024-01-07 12:03:33 +00:00
|
|
|
@mock_aws
|
2021-11-06 23:12:01 +00:00
|
|
|
def test_delete_and_get_encryption():
|
|
|
|
# Create Bucket so that test can run
|
|
|
|
conn = boto3.client("s3", region_name="us-east-1")
|
|
|
|
conn.create_bucket(Bucket="mybucket")
|
|
|
|
|
|
|
|
sse_config = {
|
|
|
|
"Rules": [
|
|
|
|
{
|
|
|
|
"ApplyServerSideEncryptionByDefault": {
|
|
|
|
"SSEAlgorithm": "aws:kms",
|
|
|
|
"KMSMasterKeyID": "12345678",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}
|
|
|
|
|
|
|
|
conn.put_bucket_encryption(
|
|
|
|
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
|
|
|
|
)
|
|
|
|
|
|
|
|
conn.delete_bucket_encryption(Bucket="mybucket")
|
|
|
|
# GET now fails, after deleting it, as it no longer exists
|
|
|
|
with pytest.raises(ClientError) as exc:
|
|
|
|
conn.get_bucket_encryption(Bucket="mybucket")
|
|
|
|
err = exc.value.response["Error"]
|
2023-08-07 16:48:48 +00:00
|
|
|
assert err["Code"] == "ServerSideEncryptionConfigurationNotFoundError"
|
2021-11-06 23:12:01 +00:00
|
|
|
|
|
|
|
|
2024-01-07 12:03:33 +00:00
|
|
|
@mock_aws
|
2021-11-06 23:12:01 +00:00
|
|
|
def test_encryption_status_on_new_objects():
|
|
|
|
bucket_name = str(uuid4())
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client = boto3.client("s3", region_name="us-east-1")
|
|
|
|
s3_client.create_bucket(Bucket=bucket_name)
|
|
|
|
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
|
2021-11-06 23:12:01 +00:00
|
|
|
# verify encryption status on object itself
|
2023-08-07 16:48:48 +00:00
|
|
|
res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
|
|
|
|
assert "ServerSideEncryption" not in res
|
2021-11-06 23:12:01 +00:00
|
|
|
# enable encryption
|
|
|
|
sse_config = {
|
|
|
|
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
|
|
|
|
}
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client.put_bucket_encryption(
|
2021-11-06 23:12:01 +00:00
|
|
|
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
|
|
|
|
)
|
|
|
|
# verify encryption status on existing object hasn't changed
|
2023-08-07 16:48:48 +00:00
|
|
|
res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
|
|
|
|
assert "ServerSideEncryption" not in res
|
2021-11-06 23:12:01 +00:00
|
|
|
# create object2
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file2.txt")
|
2021-11-06 23:12:01 +00:00
|
|
|
# verify encryption status on object2
|
2023-08-07 16:48:48 +00:00
|
|
|
res = s3_client.get_object(Bucket=bucket_name, Key="file2.txt")
|
|
|
|
assert res["ServerSideEncryption"] == "AES256"
|
2021-11-06 23:12:01 +00:00
|
|
|
|
|
|
|
|
2024-01-07 12:03:33 +00:00
|
|
|
@mock_aws
|
2021-11-06 23:12:01 +00:00
|
|
|
def test_encryption_status_on_copied_objects():
|
|
|
|
bucket_name = str(uuid4())
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client = boto3.client("s3", region_name="us-east-1")
|
|
|
|
s3_client.create_bucket(Bucket=bucket_name)
|
|
|
|
s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
|
2021-11-06 23:12:01 +00:00
|
|
|
# enable encryption
|
|
|
|
sse_config = {
|
|
|
|
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
|
|
|
|
}
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client.put_bucket_encryption(
|
2021-11-06 23:12:01 +00:00
|
|
|
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
|
|
|
|
)
|
|
|
|
# copy object
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client.copy_object(
|
2021-11-06 23:12:01 +00:00
|
|
|
CopySource=f"{bucket_name}/file.txt", Bucket=bucket_name, Key="file2.txt"
|
|
|
|
)
|
|
|
|
# verify encryption status on object1 hasn't changed
|
2023-08-07 16:48:48 +00:00
|
|
|
res = s3_client.get_object(Bucket=bucket_name, Key="file.txt")
|
|
|
|
assert "ServerSideEncryption" not in res
|
2021-11-06 23:12:01 +00:00
|
|
|
# verify encryption status on object2 does have encryption
|
2023-08-07 16:48:48 +00:00
|
|
|
res = s3_client.get_object(Bucket=bucket_name, Key="file2.txt")
|
|
|
|
assert res["ServerSideEncryption"] == "AES256"
|
2023-05-10 10:03:03 +00:00
|
|
|
|
|
|
|
|
2024-01-07 12:03:33 +00:00
|
|
|
@mock_aws
|
2023-05-10 10:03:03 +00:00
|
|
|
def test_encryption_bucket_key_for_aes_not_returned():
|
|
|
|
bucket_name = str(uuid4())
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client = boto3.client("s3", region_name="us-east-1")
|
|
|
|
s3_client.create_bucket(Bucket=bucket_name)
|
2023-05-10 10:03:03 +00:00
|
|
|
# enable encryption
|
|
|
|
sse_config = {
|
|
|
|
"Rules": [
|
|
|
|
{
|
|
|
|
"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"},
|
|
|
|
"BucketKeyEnabled": False,
|
|
|
|
}
|
|
|
|
]
|
|
|
|
}
|
2023-08-07 16:48:48 +00:00
|
|
|
s3_client.put_bucket_encryption(
|
2023-05-10 10:03:03 +00:00
|
|
|
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
|
|
|
|
)
|
2023-08-07 16:48:48 +00:00
|
|
|
res = s3_client.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
|
|
|
|
assert "BucketKeyEnabled" not in res
|