Feature: S3 - enable encryption when copying keys (#4535)

This commit is contained in:
Bert Blommers 2021-11-06 22:12:01 -01:00 committed by GitHub
parent 3353677c72
commit eb1d127851
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 148 additions and 69 deletions

View File

@ -173,15 +173,6 @@ class FakeKey(BaseModel):
self._value_buffer.write(new_value)
self.contentsize = len(new_value)
def copy(self, new_name=None, new_is_versioned=None):
r = copy.deepcopy(self)
if new_name is not None:
r.name = new_name
if new_is_versioned is not None:
r._is_versioned = new_is_versioned
r.refresh_version()
return r
def set_metadata(self, metadata, replace=False):
if replace:
self._metadata = {}
@ -213,10 +204,6 @@ class FakeKey(BaseModel):
def restore(self, days):
self._expiry = datetime.datetime.utcnow() + datetime.timedelta(days)
def refresh_version(self):
self._version_id = str(uuid.uuid4())
self.last_modified = datetime.datetime.utcnow()
@property
def etag(self):
if self._etag is None:
@ -1587,15 +1574,12 @@ class S3Backend(BaseBackend):
# getting default config from bucket if not included in put request
if bucket.encryption:
bucket_key_enabled = (
bucket_key_enabled or bucket.encryption["Rule"]["BucketKeyEnabled"]
)
kms_key_id = (
kms_key_id
or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][
"KMSMasterKeyID"
]
bucket_key_enabled = bucket_key_enabled or bucket.encryption["Rule"].get(
"BucketKeyEnabled", False
)
kms_key_id = kms_key_id or bucket.encryption["Rule"][
"ApplyServerSideEncryptionByDefault"
].get("KMSMasterKeyID")
encryption = (
encryption
or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][
@ -2022,23 +2006,30 @@ class S3Backend(BaseBackend):
acl=None,
src_version_id=None,
):
dest_key_name = clean_key_name(dest_key_name)
dest_bucket = self.get_bucket(dest_bucket_name)
key = self.get_object(src_bucket_name, src_key_name, version_id=src_version_id)
new_key = key.copy(dest_key_name, dest_bucket.is_versioned)
new_key = self.put_object(
bucket_name=dest_bucket_name,
key_name=dest_key_name,
value=key.value,
storage=storage or key.storage_class,
etag=key.etag,
multipart=key.multipart,
encryption=key.encryption,
kms_key_id=key.kms_key_id,
bucket_key_enabled=key.bucket_key_enabled,
lock_mode=key.lock_mode,
lock_legal_status=key.lock_legal_status,
lock_until=key.lock_until,
)
self.tagger.copy_tags(key.arn, new_key.arn)
if storage is not None:
new_key.set_storage_class(storage)
if acl is not None:
new_key.set_acl(acl)
if key.storage_class in "GLACIER":
# Object copied from Glacier object should not have expiry
new_key.set_expiry(None)
dest_bucket.keys[dest_key_name] = new_key
def put_bucket_acl(self, bucket_name, acl):
bucket = self.get_bucket(bucket_name)
bucket.set_acl(acl)

View File

@ -6164,47 +6164,6 @@ def test_creating_presigned_post():
assert s3.get_object(Bucket=bucket, Key=real_key)["Body"].read() == fdata
@mock_s3
def test_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
with pytest.raises(ClientError):
conn.get_bucket_encryption(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
resp = conn.get_bucket_encryption(Bucket="mybucket")
assert "ServerSideEncryptionConfiguration" in resp
return_config = sse_config.copy()
return_config["Rules"][0]["BucketKeyEnabled"] = False
assert resp["ServerSideEncryptionConfiguration"].should.equal(return_config)
conn.delete_bucket_encryption(Bucket="mybucket")
with pytest.raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
err = exc.value.response["Error"]
err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError")
err["Message"].should.equal(
"The server side encryption configuration was not found"
)
err["BucketName"].should.equal("mybucket")
@mock_s3
def test_presigned_put_url_with_approved_headers():
bucket = str(uuid.uuid4())

View File

@ -0,0 +1,129 @@
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_s3
from uuid import uuid4
@mock_s3
def test_encryption_on_new_bucket_fails():
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
with pytest.raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
err = exc.value.response["Error"]
err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError")
err["Message"].should.equal(
"The server side encryption configuration was not found"
)
err["BucketName"].should.equal("mybucket")
@mock_s3
def test_put_and_get_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
resp = conn.get_bucket_encryption(Bucket="mybucket")
assert "ServerSideEncryptionConfiguration" in resp
return_config = sse_config.copy()
return_config["Rules"][0]["BucketKeyEnabled"] = False
assert resp["ServerSideEncryptionConfiguration"].should.equal(return_config)
@mock_s3
def test_delete_and_get_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
conn.delete_bucket_encryption(Bucket="mybucket")
# GET now fails, after deleting it, as it no longer exists
with pytest.raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
err = exc.value.response["Error"]
err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError")
@mock_s3
def test_encryption_status_on_new_objects():
bucket_name = str(uuid4())
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket=bucket_name)
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
# verify encryption status on object itself
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
res.shouldnt.have.key("ServerSideEncryption")
# enable encryption
sse_config = {
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
}
s3.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
# verify encryption status on existing object hasn't changed
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
res.shouldnt.have.key("ServerSideEncryption")
# create object2
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file2.txt")
# verify encryption status on object2
res = s3.get_object(Bucket=bucket_name, Key="file2.txt")
res.should.have.key("ServerSideEncryption").equals("AES256")
@mock_s3
def test_encryption_status_on_copied_objects():
bucket_name = str(uuid4())
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket=bucket_name)
s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt")
# enable encryption
sse_config = {
"Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}]
}
s3.put_bucket_encryption(
Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config
)
# copy object
s3.copy_object(
CopySource=f"{bucket_name}/file.txt", Bucket=bucket_name, Key="file2.txt"
)
# verify encryption status on object1 hasn't changed
res = s3.get_object(Bucket=bucket_name, Key="file.txt")
res.shouldnt.have.key("ServerSideEncryption")
# verify encryption status on object2 does have encryption
res = s3.get_object(Bucket=bucket_name, Key="file2.txt")
res.should.have.key("ServerSideEncryption").equals("AES256")