diff --git a/moto/s3/models.py b/moto/s3/models.py index fc9e567f1..4bbaa1a3f 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -173,15 +173,6 @@ class FakeKey(BaseModel): self._value_buffer.write(new_value) self.contentsize = len(new_value) - def copy(self, new_name=None, new_is_versioned=None): - r = copy.deepcopy(self) - if new_name is not None: - r.name = new_name - if new_is_versioned is not None: - r._is_versioned = new_is_versioned - r.refresh_version() - return r - def set_metadata(self, metadata, replace=False): if replace: self._metadata = {} @@ -213,10 +204,6 @@ class FakeKey(BaseModel): def restore(self, days): self._expiry = datetime.datetime.utcnow() + datetime.timedelta(days) - def refresh_version(self): - self._version_id = str(uuid.uuid4()) - self.last_modified = datetime.datetime.utcnow() - @property def etag(self): if self._etag is None: @@ -1587,15 +1574,12 @@ class S3Backend(BaseBackend): # getting default config from bucket if not included in put request if bucket.encryption: - bucket_key_enabled = ( - bucket_key_enabled or bucket.encryption["Rule"]["BucketKeyEnabled"] - ) - kms_key_id = ( - kms_key_id - or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][ - "KMSMasterKeyID" - ] + bucket_key_enabled = bucket_key_enabled or bucket.encryption["Rule"].get( + "BucketKeyEnabled", False ) + kms_key_id = kms_key_id or bucket.encryption["Rule"][ + "ApplyServerSideEncryptionByDefault" + ].get("KMSMasterKeyID") encryption = ( encryption or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][ @@ -2022,23 +2006,30 @@ class S3Backend(BaseBackend): acl=None, src_version_id=None, ): - dest_key_name = clean_key_name(dest_key_name) - dest_bucket = self.get_bucket(dest_bucket_name) key = self.get_object(src_bucket_name, src_key_name, version_id=src_version_id) - new_key = key.copy(dest_key_name, dest_bucket.is_versioned) + new_key = self.put_object( + bucket_name=dest_bucket_name, + key_name=dest_key_name, + value=key.value, + storage=storage or key.storage_class, + etag=key.etag, + multipart=key.multipart, + encryption=key.encryption, + kms_key_id=key.kms_key_id, + bucket_key_enabled=key.bucket_key_enabled, + lock_mode=key.lock_mode, + lock_legal_status=key.lock_legal_status, + lock_until=key.lock_until, + ) self.tagger.copy_tags(key.arn, new_key.arn) - if storage is not None: - new_key.set_storage_class(storage) if acl is not None: new_key.set_acl(acl) if key.storage_class in "GLACIER": # Object copied from Glacier object should not have expiry new_key.set_expiry(None) - dest_bucket.keys[dest_key_name] = new_key - def put_bucket_acl(self, bucket_name, acl): bucket = self.get_bucket(bucket_name) bucket.set_acl(acl) diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py index 95887e929..fdf517a47 100644 --- a/tests/test_s3/test_s3.py +++ b/tests/test_s3/test_s3.py @@ -6164,47 +6164,6 @@ def test_creating_presigned_post(): assert s3.get_object(Bucket=bucket, Key=real_key)["Body"].read() == fdata -@mock_s3 -def test_encryption(): - # Create Bucket so that test can run - conn = boto3.client("s3", region_name="us-east-1") - conn.create_bucket(Bucket="mybucket") - - with pytest.raises(ClientError): - conn.get_bucket_encryption(Bucket="mybucket") - - sse_config = { - "Rules": [ - { - "ApplyServerSideEncryptionByDefault": { - "SSEAlgorithm": "aws:kms", - "KMSMasterKeyID": "12345678", - } - } - ] - } - - conn.put_bucket_encryption( - Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config - ) - - resp = conn.get_bucket_encryption(Bucket="mybucket") - assert "ServerSideEncryptionConfiguration" in resp - return_config = sse_config.copy() - return_config["Rules"][0]["BucketKeyEnabled"] = False - assert resp["ServerSideEncryptionConfiguration"].should.equal(return_config) - - conn.delete_bucket_encryption(Bucket="mybucket") - with pytest.raises(ClientError) as exc: - conn.get_bucket_encryption(Bucket="mybucket") - err = exc.value.response["Error"] - err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError") - err["Message"].should.equal( - "The server side encryption configuration was not found" - ) - err["BucketName"].should.equal("mybucket") - - @mock_s3 def test_presigned_put_url_with_approved_headers(): bucket = str(uuid.uuid4()) diff --git a/tests/test_s3/test_s3_encryption.py b/tests/test_s3/test_s3_encryption.py new file mode 100644 index 000000000..7a9829d70 --- /dev/null +++ b/tests/test_s3/test_s3_encryption.py @@ -0,0 +1,129 @@ +import boto3 +import pytest + +from botocore.exceptions import ClientError +from moto import mock_s3 +from uuid import uuid4 + + +@mock_s3 +def test_encryption_on_new_bucket_fails(): + conn = boto3.client("s3", region_name="us-east-1") + conn.create_bucket(Bucket="mybucket") + + with pytest.raises(ClientError) as exc: + conn.get_bucket_encryption(Bucket="mybucket") + err = exc.value.response["Error"] + err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError") + err["Message"].should.equal( + "The server side encryption configuration was not found" + ) + err["BucketName"].should.equal("mybucket") + + +@mock_s3 +def test_put_and_get_encryption(): + # Create Bucket so that test can run + conn = boto3.client("s3", region_name="us-east-1") + conn.create_bucket(Bucket="mybucket") + + sse_config = { + "Rules": [ + { + "ApplyServerSideEncryptionByDefault": { + "SSEAlgorithm": "aws:kms", + "KMSMasterKeyID": "12345678", + } + } + ] + } + + conn.put_bucket_encryption( + Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config + ) + + resp = conn.get_bucket_encryption(Bucket="mybucket") + assert "ServerSideEncryptionConfiguration" in resp + return_config = sse_config.copy() + return_config["Rules"][0]["BucketKeyEnabled"] = False + assert resp["ServerSideEncryptionConfiguration"].should.equal(return_config) + + +@mock_s3 +def test_delete_and_get_encryption(): + # Create Bucket so that test can run + conn = boto3.client("s3", region_name="us-east-1") + conn.create_bucket(Bucket="mybucket") + + sse_config = { + "Rules": [ + { + "ApplyServerSideEncryptionByDefault": { + "SSEAlgorithm": "aws:kms", + "KMSMasterKeyID": "12345678", + } + } + ] + } + + conn.put_bucket_encryption( + Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config + ) + + conn.delete_bucket_encryption(Bucket="mybucket") + # GET now fails, after deleting it, as it no longer exists + with pytest.raises(ClientError) as exc: + conn.get_bucket_encryption(Bucket="mybucket") + err = exc.value.response["Error"] + err["Code"].should.equal("ServerSideEncryptionConfigurationNotFoundError") + + +@mock_s3 +def test_encryption_status_on_new_objects(): + bucket_name = str(uuid4()) + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket=bucket_name) + s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt") + # verify encryption status on object itself + res = s3.get_object(Bucket=bucket_name, Key="file.txt") + res.shouldnt.have.key("ServerSideEncryption") + # enable encryption + sse_config = { + "Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}] + } + s3.put_bucket_encryption( + Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config + ) + # verify encryption status on existing object hasn't changed + res = s3.get_object(Bucket=bucket_name, Key="file.txt") + res.shouldnt.have.key("ServerSideEncryption") + # create object2 + s3.put_object(Bucket=bucket_name, Body=b"test", Key="file2.txt") + # verify encryption status on object2 + res = s3.get_object(Bucket=bucket_name, Key="file2.txt") + res.should.have.key("ServerSideEncryption").equals("AES256") + + +@mock_s3 +def test_encryption_status_on_copied_objects(): + bucket_name = str(uuid4()) + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket=bucket_name) + s3.put_object(Bucket=bucket_name, Body=b"test", Key="file.txt") + # enable encryption + sse_config = { + "Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}] + } + s3.put_bucket_encryption( + Bucket=bucket_name, ServerSideEncryptionConfiguration=sse_config + ) + # copy object + s3.copy_object( + CopySource=f"{bucket_name}/file.txt", Bucket=bucket_name, Key="file2.txt" + ) + # verify encryption status on object1 hasn't changed + res = s3.get_object(Bucket=bucket_name, Key="file.txt") + res.shouldnt.have.key("ServerSideEncryption") + # verify encryption status on object2 does have encryption + res = s3.get_object(Bucket=bucket_name, Key="file2.txt") + res.should.have.key("ServerSideEncryption").equals("AES256")