S3: fix checksum calculation (#6309)

* fix S3 compute checksums

* format

* fix test

* add copying of checksum between objects

* fix unit tests for checksum
This commit is contained in:
Ben Simon Hartung 2023-05-12 15:00:00 +03:00 committed by GitHub
parent 922ee7edc1
commit 760fa8fdc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 121 additions and 10 deletions

View File

@ -2418,6 +2418,10 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
# Object copied from Glacier object should not have expiry # Object copied from Glacier object should not have expiry
new_key.set_expiry(None) new_key.set_expiry(None)
if src_key.checksum_value:
new_key.checksum_value = src_key.checksum_value
new_key.checksum_algorithm = src_key.checksum_algorithm
# Send notifications that an object was copied # Send notifications that an object was copied
notifications.send_event( notifications.send_event(
self.account_id, notifications.S3_OBJECT_CREATE_COPY, bucket, new_key self.account_id, notifications.S3_OBJECT_CREATE_COPY, bucket, new_key

View File

@ -200,7 +200,7 @@ def compute_checksum(body: bytes, algorithm: str) -> bytes:
if algorithm == "SHA1": if algorithm == "SHA1":
hashed_body = _hash(hashlib.sha1, (body,)) hashed_body = _hash(hashlib.sha1, (body,))
elif algorithm == "CRC32" or algorithm == "CRC32C": elif algorithm == "CRC32" or algorithm == "CRC32C":
hashed_body = f"{binascii.crc32(body)}".encode("utf-8") hashed_body = binascii.crc32(body).to_bytes(4, "big")
else: else:
hashed_body = _hash(hashlib.sha256, (body,)) hashed_body = _hash(hashlib.sha256, (body,))
return base64.b64encode(hashed_body) return base64.b64encode(hashed_body)
@ -208,10 +208,10 @@ def compute_checksum(body: bytes, algorithm: str) -> bytes:
def _hash(fn: Any, args: Any) -> bytes: def _hash(fn: Any, args: Any) -> bytes:
try: try:
return fn(*args, usedforsecurity=False).hexdigest().encode("utf-8") return fn(*args, usedforsecurity=False).digest()
except TypeError: except TypeError:
# The usedforsecurity-parameter is only available as of Python 3.9 # The usedforsecurity-parameter is only available as of Python 3.9
return fn(*args).hexdigest().encode("utf-8") return fn(*args).digest()
def cors_matches_origin(origin_header: str, allowed_origins: List[str]) -> bool: def cors_matches_origin(origin_header: str, allowed_origins: List[str]) -> bool:

View File

@ -46,7 +46,7 @@ def test_copy_key_boto3_with_sha256_checksum():
key_name = "key" key_name = "key"
new_key = "new_key" new_key = "new_key"
bucket = "foobar" bucket = "foobar"
expected_hash = "YWIzZDA3ZjMxNjljY2JkMGVkNmM0YjQ1ZGUyMTUxOWY5ZjkzOGM3MmQyNDEyNDk5OGFhYjk0OWNlODNiYjUxYg==" expected_hash = "qz0H8xacy9DtbEtF3iFRn5+TjHLSQSSZiquUnOg7tRs="
s3.create_bucket(Bucket=bucket) s3.create_bucket(Bucket=bucket)
key = s3.Object("foobar", key_name) key = s3.Object("foobar", key_name)
@ -730,3 +730,112 @@ def test_copy_object_in_place_with_bucket_encryption():
Key=key, Key=key,
) )
assert response["ServerSideEncryption"] == "AES256" assert response["ServerSideEncryption"] == "AES256"
@mock_s3
@pytest.mark.parametrize(
"algorithm",
["CRC32", "SHA1", "SHA256"],
)
def test_copy_key_boto3_with_both_sha256_checksum(algorithm):
# This test will validate that moto S3 checksum calculations are correct
# We first create an object with a Checksum calculated by boto, by specifying ChecksumAlgorithm="SHA256"
# we then retrieve the right checksum from this request
# we copy the object while requesting moto to recalculate the checksum for that key
# we verify that both checksums are equal
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
source_key = "source-key"
dest_key = "dest-key"
bucket = "foobar"
body = b"checksum-test"
client.create_bucket(Bucket=bucket)
checksum_key = f"Checksum{algorithm}"
resp = client.put_object(
Bucket=bucket,
Key=source_key,
Body=body,
ChecksumAlgorithm=algorithm,
)
assert checksum_key in resp
checksum_by_boto = resp[checksum_key]
resp = client.copy_object(
Bucket=bucket,
CopySource=f"{bucket}/{source_key}",
Key=dest_key,
ChecksumAlgorithm=algorithm,
)
assert checksum_key in resp["CopyObjectResult"]
assert resp["CopyObjectResult"][checksum_key] == checksum_by_boto
@mock_s3
@pytest.mark.parametrize(
"algorithm, checksum",
[
("CRC32", "lVk/nw=="),
("SHA1", "jbXkHAsXUrubtL3dqDQ4w+7WXc0="),
("SHA256", "1YQo81vx2VFUl0q5ccWISq8AkSBQQ0WO80S82TmfdIQ="),
],
)
def test_copy_object_calculates_checksum(algorithm, checksum):
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
source_key = "source-key"
dest_key = "dest-key"
bucket = "foobar"
body = b"test-checksum"
client.create_bucket(Bucket=bucket)
checksum_key = f"Checksum{algorithm}"
resp = client.put_object(
Bucket=bucket,
Key=source_key,
Body=body,
)
assert checksum_key not in resp
resp = client.copy_object(
Bucket=bucket,
CopySource=f"{bucket}/{source_key}",
Key=dest_key,
ChecksumAlgorithm=algorithm,
)
assert checksum_key in resp["CopyObjectResult"]
assert resp["CopyObjectResult"][checksum_key] == checksum
@mock_s3
def test_copy_object_keeps_checksum():
client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
source_key = "source-key"
dest_key = "dest-key"
bucket = "foobar"
body = b"test-checksum"
expected_checksum = "1YQo81vx2VFUl0q5ccWISq8AkSBQQ0WO80S82TmfdIQ="
client.create_bucket(Bucket=bucket)
# put an object with a checksum
resp = client.put_object(
Bucket=bucket,
Key=source_key,
Body=body,
ChecksumAlgorithm="SHA256",
)
assert "ChecksumSHA256" in resp
assert resp["ChecksumSHA256"] == expected_checksum
# do not specify the checksum
resp = client.copy_object(
Bucket=bucket,
CopySource=f"{bucket}/{source_key}",
Key=dest_key,
)
# assert that it kept the checksum from the source key
assert "ChecksumSHA256" in resp["CopyObjectResult"]
assert resp["CopyObjectResult"]["ChecksumSHA256"] == expected_checksum

View File

@ -124,24 +124,22 @@ def test_undo_clean_key_name(key, expected):
def test_checksum_sha256(): def test_checksum_sha256():
checksum = b"ODdkMTQ5Y2I0MjRjMDM4NzY1NmYyMTFkMjU4OWZiNWIxZTE2MjI5OTIxMzA5ZTk4NTg4NDE5Y2NjYThhNzM2Mg==" checksum = b"h9FJy0JMA4dlbyEdJYn7Wx4WIpkhMJ6YWIQZzMqKc2I="
compute_checksum(b"somedata", "SHA256").should.equal(checksum) compute_checksum(b"somedata", "SHA256").should.equal(checksum)
# Unknown algorithms fallback to SHA256 for now # Unknown algorithms fallback to SHA256 for now
compute_checksum(b"somedata", algorithm="unknown").should.equal(checksum) compute_checksum(b"somedata", algorithm="unknown").should.equal(checksum)
def test_checksum_sha1(): def test_checksum_sha1():
compute_checksum(b"somedata", "SHA1").should.equal( compute_checksum(b"somedata", "SHA1").should.equal(b"76oxGuRIpzdMEiBhv+2VLZQOnjc=")
b"ZWZhYTMxMWFlNDQ4YTczNzRjMTIyMDYxYmZlZDk1MmQ5NDBlOWUzNw=="
)
def test_checksum_crc32(): def test_checksum_crc32():
compute_checksum(b"somedata", "CRC32").should.equal(b"MTM5MzM0Mzk1Mg==") compute_checksum(b"somedata", "CRC32").should.equal(b"Uwy90A==")
def test_checksum_crc32c(): def test_checksum_crc32c():
compute_checksum(b"somedata", "CRC32C").should.equal(b"MTM5MzM0Mzk1Mg==") compute_checksum(b"somedata", "CRC32C").should.equal(b"Uwy90A==")
def test_cors_utils(): def test_cors_utils():