implemented s3 default encryption methods (#3022)

* implemented s3 default encryption methods

* PR adjustments: moved logic for retrieving bucket's encrypted status to the backend.

Co-authored-by: Joseph Weitekamp <jweite@amazon.com>
This commit is contained in:
jweite 2020-05-27 12:21:03 -04:00 committed by GitHub
parent b7a1b666a8
commit 4d3e3c8c5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 104 additions and 1 deletions

View File

@ -778,6 +778,7 @@ class FakeBucket(BaseModel):
self.payer = "BucketOwner"
self.creation_date = datetime.datetime.utcnow()
self.public_access_block = None
self.encryption = None
@property
def location(self):
@ -1227,6 +1228,9 @@ class S3Backend(BaseBackend):
def get_bucket_versioning(self, bucket_name):
return self.get_bucket(bucket_name).versioning_status
def get_bucket_encryption(self, bucket_name):
return self.get_bucket(bucket_name).encryption
def get_bucket_latest_versions(self, bucket_name):
versions = self.get_bucket_versions(bucket_name)
latest_modified_per_key = {}
@ -1275,6 +1279,12 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name)
bucket.policy = None
def put_bucket_encryption(self, bucket_name, encryption):
self.get_bucket(bucket_name).encryption = encryption
def delete_bucket_encryption(self, bucket_name):
self.get_bucket(bucket_name).encryption = None
def set_bucket_lifecycle(self, bucket_name, rules):
bucket = self.get_bucket(bucket_name)
bucket.set_lifecycle(rules)

View File

@ -466,6 +466,13 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
is_truncated="false",
),
)
elif "encryption" in querystring:
encryption = self.backend.get_bucket_encryption(bucket_name)
if not encryption:
template = self.response_template(S3_NO_ENCRYPTION)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_ENCRYPTION_CONFIG)
return 200, {}, template.render(encryption=encryption)
elif querystring.get("list-type", [None])[0] == "2":
return 200, {}, self._handle_list_objects_v2(bucket_name, querystring)
@ -703,7 +710,16 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
bucket_name, pab_config["PublicAccessBlockConfiguration"]
)
return ""
elif "encryption" in querystring:
try:
self.backend.put_bucket_encryption(
bucket_name, self._encryption_config_from_xml(body)
)
return ""
except KeyError:
raise MalformedXML()
except Exception as e:
raise e
else:
# us-east-1, the default AWS region behaves a bit differently
# - you should not use it as a location constraint --> it fails
@ -768,6 +784,9 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
elif "publicAccessBlock" in querystring:
self.backend.delete_bucket_public_access_block(bucket_name)
return 204, {}, ""
elif "encryption" in querystring:
bucket = self.backend.delete_bucket_encryption(bucket_name)
return 204, {}, ""
removed_bucket = self.backend.delete_bucket(bucket_name)
@ -1427,6 +1446,22 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return [parsed_xml["CORSConfiguration"]["CORSRule"]]
def _encryption_config_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
if (
not parsed_xml["ServerSideEncryptionConfiguration"].get("Rule")
or not parsed_xml["ServerSideEncryptionConfiguration"]["Rule"].get(
"ApplyServerSideEncryptionByDefault"
)
or not parsed_xml["ServerSideEncryptionConfiguration"]["Rule"][
"ApplyServerSideEncryptionByDefault"
].get("SSEAlgorithm")
):
raise MalformedXML()
return [parsed_xml["ServerSideEncryptionConfiguration"]]
def _logging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
@ -2130,6 +2165,31 @@ S3_NO_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<BucketLoggingStatus xmlns="http://doc.s3.amazonaws.com/2006-03-01" />
"""
S3_ENCRYPTION_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<BucketEncryptionStatus xmlns="http://doc.s3.amazonaws.com/2006-03-01">
{% for entry in encryption %}
<Rule>
<ApplyServerSideEncryptionByDefault>
<SSEAlgorithm>{{ entry["Rule"]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] }}</SSEAlgorithm>
{% if entry["Rule"]["ApplyServerSideEncryptionByDefault"].get("KMSMasterKeyID") %}
<KMSMasterKeyID>{{ entry["Rule"]["ApplyServerSideEncryptionByDefault"]["KMSMasterKeyID"] }}</KMSMasterKeyID>
{% endif %}
</ApplyServerSideEncryptionByDefault>
</Rule>
{% endfor %}
</BucketEncryptionStatus>
"""
S3_NO_ENCRYPTION = """<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>ServerSideEncryptionConfigurationNotFoundError</Code>
<Message>The server side encryption configuration was not found</Message>
<BucketName>{{ bucket_name }}</BucketName>
<RequestId>0D68A23BB2E2215B</RequestId>
<HostId>9Gjjt1m+cjU4OPvX9O9/8RuvnG41MRb/18Oux2o5H5MY7ISNTlXN+Dz9IG62/ILVxhAGI0qyPfg=</HostId>
</Error>
"""
S3_GET_BUCKET_NOTIFICATION_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<NotificationConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
{% for topic in bucket.notification_configuration.topic %}

View File

@ -4521,3 +4521,36 @@ def test_creating_presigned_post():
].read()
== fdata
)
@mock_s3
def test_encryption():
# Create Bucket so that test can run
conn = boto3.client("s3", region_name="us-east-1")
conn.create_bucket(Bucket="mybucket")
with assert_raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")
sse_config = {
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "12345678",
}
}
]
}
conn.put_bucket_encryption(
Bucket="mybucket", ServerSideEncryptionConfiguration=sse_config
)
resp = conn.get_bucket_encryption(Bucket="mybucket")
assert "ServerSideEncryptionConfiguration" in resp
assert resp["ServerSideEncryptionConfiguration"] == sse_config
conn.delete_bucket_encryption(Bucket="mybucket")
with assert_raises(ClientError) as exc:
conn.get_bucket_encryption(Bucket="mybucket")