130 lines
4.3 KiB
Python
130 lines
4.3 KiB
Python
import boto3
|
|
import pytest
|
|
|
|
from botocore.exceptions import ClientError
|
|
from moto import mock_s3
|
|
from uuid import uuid4
|
|
|
|
|
|
@mock_s3
|
|
def test_encryption_on_new_bucket_fails():
|
|
conn = boto3.client("s3", region_name="us-east-1")
|
|
conn.create_bucket(Bucket="mybucket")
|
|
|
|
with pytest.raises(ClientError) as exc:
|
|
conn.get_bucket_encryption(Bucket="mybucket")
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError")
|
|
err["Message"].should.equal(
|
|
"The server side encryption configuration was not found"
|
|
)
|
|
err["BucketName"].should.equal("mybucket")
|
|
|
|
|
|
@mock_s3
|
|
def test_put_and_get_encryption():
|
|
# Create Bucket so that test can run
|
|
conn = boto3.client("s3", region_name="us-east-1")
|
|
conn.create_bucket(Bucket="mybucket")
|
|
|
|
sse_config = {
|
|
"Rules": [
|
|
{
|
|
"ApplyServerSideEncryptionByDefault": {
|
|
"SSEAlgorithm": "aws:kms",
|
|
"KMSMasterKeyID": "12345678",
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
conn.put_bucket_encryption(
|
|
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
|
|
)
|
|
|
|
resp = conn.get_bucket_encryption(Bucket="mybucket")
|
|
assert "ServerSideEncryptionConfiguration" in resp
|
|
return_config = sse_config.copy()
|
|
return_config["Rules"][0]["BucketKeyEnabled"] = False
|
|
assert resp["ServerSideEncryptionConfiguration"].should.equal(return_config)
|
|
|
|
|
|
@mock_s3
|
|
def test_delete_and_get_encryption():
|
|
# Create Bucket so that test can run
|
|
conn = boto3.client("s3", region_name="us-east-1")
|
|
conn.create_bucket(Bucket="mybucket")
|
|
|
|
sse_config = {
|
|
"Rules": [
|
|
{
|
|
"ApplyServerSideEncryptionByDefault": {
|
|
"SSEAlgorithm": "aws:kms",
|
|
"KMSMasterKeyID": "12345678",
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
conn.put_bucket_encryption(
|
|
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
|
|
)
|
|
|
|
conn.delete_bucket_encryption(Bucket="mybucket")
|
|
# GET now fails, after deleting it, as it no longer exists
|
|
with pytest.raises(ClientError) as exc:
|
|
conn.get_bucket_encryption(Bucket="mybucket")
|
|
err = exc.value.response["Error"]
|
|
err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError")
|
|
|
|
|
|
@mock_s3
|
|
def test_encryption_status_on_new_objects():
|
|
bucket_name = str(uuid4())
|
|
s3 = boto3.client("s3", region_name="us-east-1")
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
|
|
# verify encryption status on object itself
|
|
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
|
|
res.shouldnt.have.key("ServerSideEncryption")
|
|
# enable encryption
|
|
sse_config = {
|
|
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
|
|
}
|
|
s3.put_bucket_encryption(
|
|
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
|
|
)
|
|
# verify encryption status on existing object hasn't changed
|
|
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
|
|
res.shouldnt.have.key("ServerSideEncryption")
|
|
# create object2
|
|
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file2.txt")
|
|
# verify encryption status on object2
|
|
res = s3.get_object(Bucket=bucket_name, Key="file2.txt")
|
|
res.should.have.key("ServerSideEncryption").equals("AES256")
|
|
|
|
|
|
@mock_s3
|
|
def test_encryption_status_on_copied_objects():
|
|
bucket_name = str(uuid4())
|
|
s3 = boto3.client("s3", region_name="us-east-1")
|
|
s3.create_bucket(Bucket=bucket_name)
|
|
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
|
|
# enable encryption
|
|
sse_config = {
|
|
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
|
|
}
|
|
s3.put_bucket_encryption(
|
|
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
|
|
)
|
|
# copy object
|
|
s3.copy_object(
|
|
CopySource=f"{bucket_name}/file.txt", Bucket=bucket_name, Key="file2.txt"
|
|
)
|
|
# verify encryption status on object1 hasn't changed
|
|
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
|
|
res.shouldnt.have.key("ServerSideEncryption")
|
|
# verify encryption status on object2 does have encryption
|
|
res = s3.get_object(Bucket=bucket_name, Key="file2.txt")
|
|
res.should.have.key("ServerSideEncryption").equals("AES256")
|