S3: put_object_tagging() now validates the number of tags provided (#7112)

This commit is contained in:
Bert Blommers 2023-12-10 20:22:52 -01:00 committed by GitHub
parent 57d8f23926
commit faeab5cd99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 79 additions and 61 deletions

View File

@ -6257,7 +6257,7 @@
## s3
<details>
<summary>65% implemented</summary>
<summary>66% implemented</summary>
- [X] abort_multipart_upload
- [X] complete_multipart_upload
@ -6351,7 +6351,7 @@
- [X] put_object_legal_hold
- [X] put_object_lock_configuration
- [X] put_object_retention
- [ ] put_object_tagging
- [X] put_object_tagging
- [X] put_public_access_block
- [ ] restore_object
- [X] select_object_content

View File

@ -144,7 +144,7 @@ s3
- [X] put_object_legal_hold
- [X] put_object_lock_configuration
- [X] put_object_retention
- [ ] put_object_tagging
- [X] put_object_tagging
- [X] put_public_access_block
- [ ] restore_object
- [X] select_object_content

View File

@ -65,6 +65,13 @@ class AccessForbidden(S3ClientError):
super().__init__("AccessForbidden", msg)
class BadRequest(S3ClientError):
code = 403
def __init__(self, msg: str):
super().__init__("BadRequest", msg)
class BucketError(S3ClientError):
def __init__(self, *args: Any, **kwargs: Any):
kwargs.setdefault("template", "bucket_error")

View File

@ -34,6 +34,7 @@ from moto.moto_api._internal import mock_random as random
from moto.moto_api._internal.managed_state_model import ManagedState
from moto.s3.exceptions import (
AccessDeniedByLock,
BadRequest,
BucketAlreadyExists,
BucketNeedsToBeNew,
CopyObjectMustChangeSomething,
@ -2161,7 +2162,7 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
def get_object_tagging(self, key: FakeKey) -> Dict[str, List[Dict[str, str]]]:
return self.tagger.list_tags_for_resource(key.arn)
def set_key_tags(
def put_object_tagging(
self,
key: Optional[FakeKey],
tags: Optional[Dict[str, str]],
@ -2169,12 +2170,19 @@ class S3Backend(BaseBackend, CloudWatchMetricProvider):
) -> FakeKey:
if key is None:
raise MissingKey(key=key_name)
boto_tags_dict = self.tagger.convert_dict_to_tags_input(tags)
errmsg = self.tagger.validate_tags(boto_tags_dict)
tags_input = self.tagger.convert_dict_to_tags_input(tags)
# Validation custom to S3
if tags:
if len(tags_input) > 10:
raise BadRequest("Object tags cannot be greater than 10")
if any([tagkey.startswith("aws") for tagkey in tags.keys()]):
raise InvalidTagError("Your TagKey cannot be prefixed with aws:")
# Validation shared across all services
errmsg = self.tagger.validate_tags(tags_input)
if errmsg:
raise InvalidTagError(errmsg)
self.tagger.delete_all_tags_for_resource(key.arn)
self.tagger.tag_resource(key.arn, boto_tags_dict)
self.tagger.tag_resource(key.arn, tags_input)
return key
def get_bucket_tagging(self, bucket_name: str) -> Dict[str, List[Dict[str, str]]]:

View File

@ -888,7 +888,7 @@ class S3Response(BaseResponse):
elif "tagging" in querystring:
tagging = self._bucket_tagging_from_body()
self.backend.put_bucket_tagging(bucket_name, tagging)
return ""
return 204, {}, ""
elif "website" in querystring:
self.backend.set_bucket_website_configuration(bucket_name, self.body)
return ""
@ -1629,7 +1629,7 @@ class S3Response(BaseResponse):
bucket_name, key_name, version_id=version_id
)
tagging = self._tagging_from_xml(body)
self.backend.set_key_tags(key_to_tag, tagging, key_name)
self.backend.put_object_tagging(key_to_tag, tagging, key_name)
return 200, response_headers, ""
if "x-amz-copy-source" in request.headers:
@ -1692,7 +1692,7 @@ class S3Response(BaseResponse):
tdirective = request.headers.get("x-amz-tagging-directive")
if tdirective == "REPLACE":
tagging = self._tagging_from_headers(request.headers)
self.backend.set_key_tags(new_key, tagging)
self.backend.put_object_tagging(new_key, tagging)
if key_to_copy.version_id != "null":
response_headers[
"x-amz-copy-source-version-id"
@ -1742,7 +1742,7 @@ class S3Response(BaseResponse):
)
if checksum_algorithm:
new_key.checksum_algorithm = checksum_algorithm
self.backend.set_key_tags(new_key, tagging)
self.backend.put_object_tagging(new_key, tagging)
response_headers.update(new_key.response_dict)
# Remove content-length - the response body is empty for this request
@ -2279,7 +2279,7 @@ class S3Response(BaseResponse):
)
key.checksum_value = checksum
self.backend.set_key_tags(key, multipart.tags)
self.backend.put_object_tagging(key, multipart.tags)
self.backend.put_object_acl(
bucket_name=bucket_name,
key_name=key.name,

View File

@ -6,6 +6,7 @@ from botocore.client import ClientError
from moto import mock_s3, settings
from moto.s3.responses import DEFAULT_REGION_NAME
from . import s3_aws_verified
from .test_s3 import add_proxy_details
@ -21,12 +22,11 @@ def test_get_bucket_tagging_unknown_bucket():
)
@mock_s3
def test_put_object_with_tagging():
@pytest.mark.aws_verified
@s3_aws_verified
def test_put_object_with_tagging(bucket_name=None):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
key = "key-with-tags"
s3_client.create_bucket(Bucket=bucket_name)
# using system tags will fail
with pytest.raises(ClientError) as err:
@ -34,34 +34,34 @@ def test_put_object_with_tagging():
Bucket=bucket_name, Key=key, Body="test", Tagging="aws:foo=bar"
)
err_value = err.value
assert err_value.response["Error"]["Code"] == "InvalidTag"
err = err.value.response["Error"]
assert err["Code"] == "InvalidTag"
assert err["Message"] == "Your TagKey cannot be prefixed with aws:"
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test", Tagging="foo=bar")
assert {"Key": "foo", "Value": "bar"} in s3_client.get_object_tagging(
Bucket=bucket_name, Key=key
)["TagSet"]
tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
assert {"Key": "foo", "Value": "bar"} in tags["TagSet"]
resp = s3_client.get_object(Bucket=bucket_name, Key=key)
assert resp["TagCount"] == 1
s3_client.delete_object_tagging(Bucket=bucket_name, Key=key)
assert s3_client.get_object_tagging(Bucket=bucket_name, Key=key)["TagSet"] == []
tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
assert tags["TagSet"] == []
@mock_s3
def test_put_bucket_tagging():
@pytest.mark.aws_verified
@s3_aws_verified
def test_put_bucket_tagging(bucket_name=None):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
s3_client.create_bucket(Bucket=bucket_name)
# With 1 tag:
resp = s3_client.put_bucket_tagging(
Bucket=bucket_name, Tagging={"TagSet": [{"Key": "TagOne", "Value": "ValueOne"}]}
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 204
# With multiple tags:
resp = s3_client.put_bucket_tagging(
@ -75,11 +75,7 @@ def test_put_bucket_tagging():
},
)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
# No tags is also OK:
resp = s3_client.put_bucket_tagging(Bucket=bucket_name, Tagging={"TagSet": []})
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 204
# With duplicate tag keys:
with pytest.raises(ClientError) as err:
@ -92,11 +88,9 @@ def test_put_bucket_tagging():
]
},
)
err_value = err.value
assert err_value.response["Error"]["Code"] == "InvalidTag"
assert err_value.response["Error"]["Message"] == (
"Cannot provide multiple Tags with the same key"
)
err = err.value.response["Error"]
assert err["Code"] == "InvalidTag"
assert err["Message"] == "Cannot provide multiple Tags with the same key"
# Cannot put tags that are "system" tags - i.e. tags that start with "aws:"
with pytest.raises(ClientError) as ce_exc:
@ -104,11 +98,9 @@ def test_put_bucket_tagging():
Bucket=bucket_name,
Tagging={"TagSet": [{"Key": "aws:sometag", "Value": "nope"}]},
)
err_value = ce_exc.value
assert err_value.response["Error"]["Code"] == "InvalidTag"
assert err_value.response["Error"]["Message"] == (
"System tags cannot be added/updated by requester"
)
err = ce_exc.value.response["Error"]
assert err["Code"] == "InvalidTag"
assert err["Message"] == "System tags cannot be added/updated by requester"
# This is OK though:
s3_client.put_bucket_tagging(
@ -117,11 +109,11 @@ def test_put_bucket_tagging():
)
@mock_s3
def test_get_bucket_tagging():
@pytest.mark.aws_verified
@s3_aws_verified
def test_get_bucket_tagging(bucket_name=None):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
s3_client.create_bucket(Bucket=bucket_name)
s3_client.put_bucket_tagging(
Bucket=bucket_name,
Tagging={
@ -136,16 +128,18 @@ def test_get_bucket_tagging():
resp = s3_client.get_bucket_tagging(Bucket=bucket_name)
assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200
assert len(resp["TagSet"]) == 2
assert {"Key": "TagOne", "Value": "ValueOne"} in resp["TagSet"]
assert {"Key": "TagTwo", "Value": "ValueTwo"} in resp["TagSet"]
# With no tags:
s3_client.put_bucket_tagging(Bucket=bucket_name, Tagging={"TagSet": []})
with pytest.raises(ClientError) as err:
with pytest.raises(ClientError) as exc:
s3_client.get_bucket_tagging(Bucket=bucket_name)
err_value = err.value
assert err_value.response["Error"]["Code"] == "NoSuchTagSet"
assert err_value.response["Error"]["Message"] == "The TagSet does not exist"
err = exc.value.response["Error"]
assert err["Code"] == "NoSuchTagSet"
assert err["Message"] == "The TagSet does not exist"
@mock_s3
@ -175,12 +169,11 @@ def test_delete_bucket_tagging():
assert err_value.response["Error"]["Message"] == "The TagSet does not exist"
@mock_s3
def test_put_object_tagging():
@pytest.mark.aws_verified
@s3_aws_verified
def test_put_object_tagging(bucket_name=None):
s3_client = boto3.client("s3", region_name=DEFAULT_REGION_NAME)
bucket_name = "mybucket"
key = "key-with-tags"
s3_client.create_bucket(Bucket=bucket_name)
with pytest.raises(ClientError) as err:
s3_client.put_object_tagging(
@ -194,13 +187,10 @@ def test_put_object_tagging():
},
)
err_value = err.value
assert err_value.response["Error"] == {
"Code": "NoSuchKey",
"Message": "The specified key does not exist.",
"Key": "key-with-tags",
"RequestID": "7a62c49f-347e-4fc4-9331-6e8eEXAMPLE",
}
err = err.value.response["Error"]
assert err["Code"] == "NoSuchKey"
assert err["Message"] == "The specified key does not exist."
assert err["Key"] == key
s3_client.put_object(Bucket=bucket_name, Key=key, Body="test")
@ -215,6 +205,19 @@ def test_put_object_tagging():
err_value = err.value
assert err_value.response["Error"]["Code"] == "InvalidTag"
# Can't put more than 10 tags at the same time
with pytest.raises(ClientError) as exc:
s3_client.put_object_tagging(
Bucket=bucket_name,
Key=key,
Tagging={
"TagSet": [{"Key": f"tag{i}", "Value": "too_many"} for i in range(11)]
},
)
err = exc.value.response["Error"]
assert err["Code"] == "BadRequest"
assert err["Message"] == "Object tags cannot be greater than 10"
resp = s3_client.put_object_tagging(
Bucket=bucket_name,
Key=key,