diff --git a/moto/s3/exceptions.py b/moto/s3/exceptions.py index e6571f7c7..12f5eaa1f 100644 --- a/moto/s3/exceptions.py +++ b/moto/s3/exceptions.py @@ -559,3 +559,10 @@ class ObjectLockConfigurationNotFoundError(S3ClientError): "ObjectLockConfigurationNotFoundError", "Object Lock configuration does not exist for this bucket", ) + + +class MethodNotAllowed(S3ClientError): + code = 405 + + def __init__(self) -> None: + super().__init__("MethodNotAllowed", "Method Not Allowed") diff --git a/moto/s3/models.py b/moto/s3/models.py index ac73475f0..bd42fe479 100644 --- a/moto/s3/models.py +++ b/moto/s3/models.py @@ -42,6 +42,7 @@ from moto.s3.exceptions import ( MissingKey, InvalidNotificationDestination, MalformedXML, + MethodNotAllowed, InvalidStorageClass, InvalidTargetBucketForLogging, CrossLocationLoggingProhibitted, @@ -2068,6 +2069,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): key_name: str, version_id: Optional[str] = None, part_number: Optional[str] = None, + return_delete_marker: bool = False, ) -> Optional[FakeKey]: bucket = self.get_bucket(bucket_name) @@ -2090,6 +2092,8 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): key.advance() return key else: + if return_delete_marker and isinstance(key, FakeDeleteMarker): + return key # type: ignore return None def head_object( @@ -2099,7 +2103,12 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider): version_id: Optional[str] = None, part_number: Optional[str] = None, ) -> Optional[FakeKey]: - return self.get_object(bucket_name, key_name, version_id, part_number) + obj = self.get_object( + bucket_name, key_name, version_id, part_number, return_delete_marker=True + ) + if isinstance(obj, FakeDeleteMarker): + raise MethodNotAllowed + return obj def get_object_acl(self, key: FakeKey) -> Optional[FakeAcl]: return key.acl diff --git a/moto/s3/responses.py b/moto/s3/responses.py index 89be2257e..54dc48b97 100644 --- a/moto/s3/responses.py +++ b/moto/s3/responses.py @@ -34,6 +34,7 @@ from .exceptions import ( InvalidContentMD5, InvalidContinuationToken, S3ClientError, + MethodNotAllowed, MissingBucket, MissingKey, MissingVersion, @@ -1761,9 +1762,18 @@ class S3Response(BaseResponse): if_none_match = headers.get("If-None-Match", None) if_unmodified_since = headers.get("If-Unmodified-Since", None) - key = self.backend.head_object( - bucket_name, key_name, version_id=version_id, part_number=part_number - ) + try: + key = self.backend.head_object( + bucket_name, key_name, version_id=version_id, part_number=part_number + ) + except MethodNotAllowed: + headers = { + "x-amz-delete-marker": "true", + "x-amz-version-id": version_id, + "allow": "DELETE", + "content-type": "application/xml", + } + return 405, headers, "Method Not Allowed" if key: response_headers.update(key.metadata) response_headers.update(key.response_dict) diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py index 1c06a3a0d..52e3a598c 100644 --- a/tests/test_s3/test_s3.py +++ b/tests/test_s3/test_s3.py @@ -1684,6 +1684,9 @@ def test_delete_versioned_bucket(): @s3_aws_verified def test_delete_versioned_bucket_returns_metadata(name=None): client = boto3.client("s3", region_name=DEFAULT_REGION_NAME) + resource = boto3.resource("s3", region_name=DEFAULT_REGION_NAME) + bucket = resource.Bucket(name) + versions = bucket.object_versions client.put_bucket_versioning( Bucket=name, VersioningConfiguration={"Status": "Enabled"} @@ -1696,12 +1699,30 @@ def test_delete_versioned_bucket_returns_metadata(name=None): # Delete the object del_file = client.delete_object(Bucket=name, Key="test1") + deleted_version_id = del_file["VersionId"] assert del_file["DeleteMarker"] is True - assert del_file["VersionId"] is not None + assert deleted_version_id is not None # We now have one DeleteMarker assert len(client.list_object_versions(Bucket=name)["DeleteMarkers"]) == 1 + # list_object_versions returns the object itself, and a DeleteMarker + # object.head() returns a 'x-amz-delete-marker' header + # delete_marker.head() returns a 405 + for version in versions.filter(Prefix="test1"): + if version.version_id == deleted_version_id: + with pytest.raises(ClientError) as exc: + version.head() + err = exc.value.response + assert err["Error"] == {"Code": "405", "Message": "Method Not Allowed"} + assert err["ResponseMetadata"]["HTTPStatusCode"] == 405 + assert ( + err["ResponseMetadata"]["HTTPHeaders"]["x-amz-delete-marker"] == "true" + ) + assert err["ResponseMetadata"]["HTTPHeaders"]["allow"] == "DELETE" + else: + assert version.head()["ResponseMetadata"]["HTTPStatusCode"] == 200 + # Delete the same object gives a new version id del_mrk1 = client.delete_object(Bucket=name, Key="test1") assert del_mrk1["DeleteMarker"] is True @@ -1717,6 +1738,20 @@ def test_delete_versioned_bucket_returns_metadata(name=None): assert del_mrk2["DeleteMarker"] is True assert del_mrk2["VersionId"] == del_mrk1["VersionId"] + for version in versions.filter(Prefix="test1"): + if version.version_id == deleted_version_id: + with pytest.raises(ClientError) as exc: + version.head() + err = exc.value.response + assert err["Error"] == {"Code": "405", "Message": "Method Not Allowed"} + assert err["ResponseMetadata"]["HTTPStatusCode"] == 405 + assert ( + err["ResponseMetadata"]["HTTPHeaders"]["x-amz-delete-marker"] == "true" + ) + assert err["ResponseMetadata"]["HTTPHeaders"]["allow"] == "DELETE" + else: + assert version.head()["ResponseMetadata"]["HTTPStatusCode"] == 200 + # We now have only one DeleteMarker assert len(client.list_object_versions(Bucket=name)["DeleteMarkers"]) == 1 @@ -1732,6 +1767,13 @@ def test_delete_versioned_bucket_returns_metadata(name=None): assert len(client.list_object_versions(Bucket=name)["DeleteMarkers"]) == 1 assert "Versions" not in client.list_object_versions(Bucket=name) + # Because we only have DeleteMarkers, we can not call `head()` on any of othem + for version in versions.filter(Prefix="test1"): + with pytest.raises(ClientError) as exc: + version.head() + err = exc.value.response + assert err["Error"] == {"Code": "405", "Message": "Method Not Allowed"} + # Delete the last marker del_mrk4 = client.delete_object( Bucket=name, Key="test1", VersionId=del_mrk2["VersionId"]