diff --git a/moto/s3/models.py b/moto/s3/models.py
index 8971a8691..a9b4f256a 100644
--- a/moto/s3/models.py
+++ b/moto/s3/models.py
@@ -121,6 +121,7 @@ class FakeKey(BaseModel, ManagedState):
self.last_modified = datetime.datetime.utcnow()
self.acl = get_canned_acl("private")
self.website_redirect_location = None
+ self.checksum_algorithm = None
self._storage_class = storage if storage else "STANDARD"
self._metadata = LowercaseDict()
self._expiry = None
@@ -258,6 +259,8 @@ class FakeKey(BaseModel, ManagedState):
if self._is_versioned:
res["x-amz-version-id"] = str(self.version_id)
+ if self.checksum_algorithm is not None:
+ res["x-amz-sdk-checksum-algorithm"] = self.checksum_algorithm
if self.website_redirect_location:
res["x-amz-website-redirect-location"] = self.website_redirect_location
if self.lock_legal_status:
diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index 0fed5ea3e..b3ce38005 100644
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -53,7 +53,12 @@ from .exceptions import (
)
from .models import s3_backends
from .models import get_canned_acl, FakeGrantee, FakeGrant, FakeAcl, FakeKey
-from .utils import bucket_name_from_url, metadata_from_headers, parse_region_from_url
+from .utils import (
+ bucket_name_from_url,
+ metadata_from_headers,
+ parse_region_from_url,
+ compute_checksum,
+)
from xml.dom import minidom
@@ -1386,6 +1391,12 @@ class S3Response(BaseResponse):
checksum_value = search.group(1) if search else None
if checksum_value:
+ # TODO: AWS computes the provided value and verifies it's the same
+ # Afterwards, it should be returned in every subsequent call
+ response_headers.update({checksum_header: checksum_value})
+ elif checksum_algorithm:
+ # If the value is not provided, we compute it and only return it as part of this request
+ checksum_value = compute_checksum(body, algorithm=checksum_algorithm)
response_headers.update({checksum_header: checksum_value})
# Extract the actual data from the body second
@@ -1545,6 +1556,7 @@ class S3Response(BaseResponse):
new_key.website_redirect_location = request.headers.get(
"x-amz-website-redirect-location"
)
+ new_key.checksum_algorithm = checksum_algorithm
self.backend.set_key_tags(new_key, tagging)
response_headers.update(new_key.response_dict)
@@ -2181,6 +2193,9 @@ S3_BUCKET_GET_RESPONSE_V2 = """
webfile
{% endif %}
+ {% if key.checksum_algorithm %}
+ {{ key.checksum_algorithm }}
+ {% endif %}
{% endfor %}
{% if delimiter %}
diff --git a/moto/s3/utils.py b/moto/s3/utils.py
index fcf404484..11c53d59e 100644
--- a/moto/s3/utils.py
+++ b/moto/s3/utils.py
@@ -1,6 +1,8 @@
import logging
-
+import base64
+import binascii
import re
+import hashlib
from urllib.parse import urlparse, unquote, quote
from requests.structures import CaseInsensitiveDict
from typing import Union, Tuple
@@ -174,3 +176,21 @@ class _VersionedKeyStore(dict):
items = iteritems = _iteritems
lists = iterlists = _iterlists
values = itervalues = _itervalues
+
+
+def compute_checksum(body, algorithm):
+ if algorithm == "SHA1":
+ hashed_body = _hash(hashlib.sha1, (body,))
+ elif algorithm == "CRC32" or algorithm == "CRC32C":
+ hashed_body = f"{binascii.crc32(body)}".encode("utf-8")
+ else:
+ hashed_body = _hash(hashlib.sha256, (body,))
+ return base64.b64encode(hashed_body)
+
+
+def _hash(fn, args) -> bytes:
+ try:
+ return fn(*args, usedforsecurity=False).hexdigest().encode("utf-8")
+ except TypeError:
+ # The usedforsecurity-parameter is only available as of Python 3.9
+ return fn(*args).hexdigest().encode("utf-8")
diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py
index 2c41430ad..3549cc19f 100644
--- a/tests/test_s3/test_s3.py
+++ b/tests/test_s3/test_s3.py
@@ -1396,6 +1396,30 @@ def test_list_objects_v2_truncate_combined_keys_and_folders():
assert resp["CommonPrefixes"][0]["Prefix"] == "3/"
+@mock_s3
+def test_list_objects_v2_checksum_algo():
+ s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
+ s3.create_bucket(Bucket="mybucket")
+ resp = s3.put_object(
+ Bucket="mybucket", Key="1", Body="a", ChecksumAlgorithm="CRC32"
+ )
+ resp.should.have.key("ChecksumCRC32")
+ resp["ResponseMetadata"]["HTTPHeaders"][
+ "x-amz-sdk-checksum-algorithm"
+ ].should.equal("CRC32")
+ resp = s3.put_object(
+ Bucket="mybucket", Key="2", Body="b", ChecksumAlgorithm="SHA256"
+ )
+ resp.should.have.key("ChecksumSHA256")
+ resp["ResponseMetadata"]["HTTPHeaders"][
+ "x-amz-sdk-checksum-algorithm"
+ ].should.equal("SHA256")
+
+ resp = s3.list_objects_v2(Bucket="mybucket")["Contents"]
+ resp[0].should.have.key("ChecksumAlgorithm").equals(["CRC32"])
+ resp[1].should.have.key("ChecksumAlgorithm").equals(["SHA256"])
+
+
@mock_s3
def test_bucket_create():
s3 = boto3.resource("s3", region_name=DEFAULT_REGION_NAME)
diff --git a/tests/test_s3/test_s3_utils.py b/tests/test_s3/test_s3_utils.py
index efc65bc02..8e2a4f294 100644
--- a/tests/test_s3/test_s3_utils.py
+++ b/tests/test_s3/test_s3_utils.py
@@ -6,6 +6,7 @@ from moto.s3.utils import (
parse_region_from_url,
clean_key_name,
undo_clean_key_name,
+ compute_checksum,
)
from unittest.mock import patch
@@ -119,3 +120,24 @@ def test_clean_key_name(key, expected):
)
def test_undo_clean_key_name(key, expected):
undo_clean_key_name(key).should.equal(expected)
+
+
+def test_checksum_sha256():
+ checksum = b"ODdkMTQ5Y2I0MjRjMDM4NzY1NmYyMTFkMjU4OWZiNWIxZTE2MjI5OTIxMzA5ZTk4NTg4NDE5Y2NjYThhNzM2Mg=="
+ compute_checksum(b"somedata", "SHA256").should.equal(checksum)
+ # Unknown algorithms fallback to SHA256 for now
+ compute_checksum(b"somedata", algorithm="unknown").should.equal(checksum)
+
+
+def test_checksum_sha1():
+ compute_checksum(b"somedata", "SHA1").should.equal(
+ b"ZWZhYTMxMWFlNDQ4YTczNzRjMTIyMDYxYmZlZDk1MmQ5NDBlOWUzNw=="
+ )
+
+
+def test_checksum_crc32():
+ compute_checksum(b"somedata", "CRC32").should.equal(b"MTM5MzM0Mzk1Mg==")
+
+
+def test_checksum_crc32c():
+ compute_checksum(b"somedata", "CRC32C").should.equal(b"MTM5MzM0Mzk1Mg==")