moto/tests/test_s3/test_s3_encryption.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

130 lines
4.3 KiB
Python
Raw Normal View History

import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_s3
from uuid import uuid4
@mock_s3
def test_encryption_on_new_bucket_fails():
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
with pytest.raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
err = exc.value.response["Error"]
err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError")
err["Message"].should.equal(
"The server side encryption configuration was not found"
)
err["BucketName"].should.equal("mybucket")
@mock_s3
def test_put_and_get_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
resp = conn.get_bucket_encryption(Bucket="mybucket")
assert "ServerSideEncryptionConfiguration" in resp
return_config = sse_config.copy()
return_config["Rules"][0]["BucketKeyEnabled"] = False
assert resp["ServerSideEncryptionConfiguration"].should.equal(return_config)
@mock_s3
def test_delete_and_get_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
conn.delete_bucket_encryption(Bucket="mybucket")
# GET now fails, after deleting it, as it no longer exists
with pytest.raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
err = exc.value.response["Error"]
err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError")
@mock_s3
def test_encryption_status_on_new_objects():
bucket_name = str(uuid4())
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket=bucket_name)
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
# verify encryption status on object itself
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
res.shouldnt.have.key("ServerSideEncryption")
# enable encryption
sse_config = {
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
}
s3.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
# verify encryption status on existing object hasn't changed
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
res.shouldnt.have.key("ServerSideEncryption")
# create object2
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file2.txt")
# verify encryption status on object2
res = s3.get_object(Bucket=bucket_name, Key="file2.txt")
res.should.have.key("ServerSideEncryption").equals("AES256")
@mock_s3
def test_encryption_status_on_copied_objects():
bucket_name = str(uuid4())
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket=bucket_name)
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
# enable encryption
sse_config = {
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
}
s3.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
# copy object
s3.copy_object(
CopySource=f"{bucket_name}/file.txt", Bucket=bucket_name, Key="file2.txt"
)
# verify encryption status on object1 hasn't changed
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
res.shouldnt.have.key("ServerSideEncryption")
# verify encryption status on object2 does have encryption
res = s3.get_object(Bucket=bucket_name, Key="file2.txt")
res.should.have.key("ServerSideEncryption").equals("AES256")