S3:put_object(): Return ChecksumAlgorithm-attribute if supplied (#5735)

This commit is contained in:
Bert Blommers 2022-12-06 22:03:28 -01:00 committed by GitHub
parent 4ec748542f
commit 623be47c4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 2 deletions

View File

@ -121,6 +121,7 @@ class FakeKey(BaseModel, ManagedState):
self.last_modified = datetime.datetime.utcnow()
self.acl = get_canned_acl("private")
self.website_redirect_location = None
self.checksum_algorithm = None
self._storage_class = storage if storage else "STANDARD"
self._metadata = LowercaseDict()
self._expiry = None
@ -258,6 +259,8 @@ class FakeKey(BaseModel, ManagedState):
if self._is_versioned:
res["x-amz-version-id"] = str(self.version_id)
if self.checksum_algorithm is not None:
res["x-amz-sdk-checksum-algorithm"] = self.checksum_algorithm
if self.website_redirect_location:
res["x-amz-website-redirect-location"] = self.website_redirect_location
if self.lock_legal_status:

View File

@ -53,7 +53,12 @@ from .exceptions import (
)
from .models import s3_backends
from .models import get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey
from .utils import bucket_name_from_url, metadata_from_headers, parse_region_from_url
from .utils import (
bucket_name_from_url,
metadata_from_headers,
parse_region_from_url,
compute_checksum,
)
from xml.dom import minidom
@ -1386,6 +1391,12 @@ class S3Response(BaseResponse):
checksum_value = search.group(1) if search else None
if checksum_value:
# TODO: AWS computes the provided value and verifies it's the same
# Afterwards, it should be returned in every subsequent call
response_headers.update({checksum_header: checksum_value})
elif checksum_algorithm:
# If the value is not provided, we compute it and only return it as part of this request
checksum_value = compute_checksum(body, algorithm=checksum_algorithm)
response_headers.update({checksum_header: checksum_value})
# Extract the actual data from the body second
@ -1545,6 +1556,7 @@ class S3Response(BaseResponse):
new_key.website_redirect_location = request.headers.get(
"x-amz-website-redirect-location"
)
new_key.checksum_algorithm = checksum_algorithm
self.backend.set_key_tags(new_key, tagging)
response_headers.update(new_key.response_dict)
@ -2181,6 +2193,9 @@ S3_BUCKET_GET_RESPONSE_V2 = """<?xml version="1.0" encoding="UTF-8"?>
<DisplayName>webfile</DisplayName>
</Owner>
{% endif %}
{% if key.checksum_algorithm %}
<ChecksumAlgorithm>{{ key.checksum_algorithm }}</ChecksumAlgorithm>
{% endif %}
</Contents>
{% endfor %}
{% if delimiter %}

View File

@ -1,6 +1,8 @@
import logging
import base64
import binascii
import re
import hashlib
from urllib.parse import urlparse, unquote, quote
from requests.structures import CaseInsensitiveDict
from typing import Union, Tuple
@ -174,3 +176,21 @@ class _VersionedKeyStore(dict):
items = iteritems = _iteritems
lists = iterlists = _iterlists
values = itervalues = _itervalues
def compute_checksum(body, algorithm):
if algorithm == "SHA1":
hashed_body = _hash(hashlib.sha1, (body,))
elif algorithm == "CRC32" or algorithm == "CRC32C":
hashed_body = f"{binascii.crc32(body)}".encode("utf-8")
else:
hashed_body = _hash(hashlib.sha256, (body,))
return base64.b64encode(hashed_body)
def _hash(fn, args) -> bytes:
try:
return fn(*args, usedforsecurity=False).hexdigest().encode("utf-8")
except TypeError:
# The usedforsecurity-parameter is only available as of Python 3.9
return fn(*args).hexdigest().encode("utf-8")

View File

@ -1396,6 +1396,30 @@ def test_list_objects_v2_truncate_combined_keys_and_folders():
assert resp["CommonPrefixes"][0]["Prefix"] == "3/"
@mock_s3
def test_list_objects_v2_checksum_algo():
s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
s3.create_bucket(Bucket="mybucket")
resp = s3.put_object(
Bucket="mybucket", Key="1", Body="a", ChecksumAlgorithm="CRC32"
)
resp.should.have.key("ChecksumCRC32")
resp["ResponseMetadata"]["HTTPHeaders"][
"x-amz-sdk-checksum-algorithm"
].should.equal("CRC32")
resp = s3.put_object(
Bucket="mybucket", Key="2", Body="b", ChecksumAlgorithm="SHA256"
)
resp.should.have.key("ChecksumSHA256")
resp["ResponseMetadata"]["HTTPHeaders"][
"x-amz-sdk-checksum-algorithm"
].should.equal("SHA256")
resp = s3.list_objects_v2(Bucket="mybucket")["Contents"]
resp[0].should.have.key("ChecksumAlgorithm").equals(["CRC32"])
resp[1].should.have.key("ChecksumAlgorithm").equals(["SHA256"])
@mock_s3
def test_bucket_create():
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)

View File

@ -6,6 +6,7 @@ from moto.s3.utils import (
parse_region_from_url,
clean_key_name,
undo_clean_key_name,
compute_checksum,
)
from unittest.mock import patch
@ -119,3 +120,24 @@ def test_clean_key_name(key, expected):
)
def test_undo_clean_key_name(key, expected):
undo_clean_key_name(key).should.equal(expected)
def test_checksum_sha256():
checksum = b"ODdkMTQ5Y2I0MjRjMDM4NzY1NmYyMTFkMjU4OWZiNWIxZTE2MjI5OTIxMzA5ZTk4NTg4NDE5Y2NjYThhNzM2Mg=="
compute_checksum(b"somedata", "SHA256").should.equal(checksum)
# Unknown algorithms fallback to SHA256 for now
compute_checksum(b"somedata", algorithm="unknown").should.equal(checksum)
def test_checksum_sha1():
compute_checksum(b"somedata", "SHA1").should.equal(
b"ZWZhYTMxMWFlNDQ4YTczNzRjMTIyMDYxYmZlZDk1MmQ5NDBlOWUzNw=="
)
def test_checksum_crc32():
compute_checksum(b"somedata", "CRC32").should.equal(b"MTM5MzM0Mzk1Mg==")
def test_checksum_crc32c():
compute_checksum(b"somedata", "CRC32C").should.equal(b"MTM5MzM0Mzk1Mg==")