diff --git a/moto/s3/models.py b/moto/s3/models.py
index b5224b64a..44a94e7a3 100644
--- a/moto/s3/models.py
+++ b/moto/s3/models.py
@@ -35,7 +35,6 @@ from .exceptions import (
MalformedXML,
InvalidStorageClass,
InvalidTargetBucketForLogging,
- DuplicateTagKeys,
CrossLocationLoggingProhibitted,
NoSuchPublicAccessBlockConfiguration,
InvalidPublicAccessBlockConfiguration,
@@ -473,26 +472,10 @@ def get_canned_acl(acl):
return FakeAcl(grants=grants)
-class FakeTagging(BaseModel):
- def __init__(self, tag_set=None):
- self.tag_set = tag_set or FakeTagSet()
-
-
-class FakeTagSet(BaseModel):
- def __init__(self, tags=None):
- self.tags = tags or []
-
-
-class FakeTag(BaseModel):
- def __init__(self, key, value=None):
- self.key = key
- self.value = value
-
-
class LifecycleFilter(BaseModel):
def __init__(self, prefix=None, tag=None, and_filter=None):
self.prefix = prefix
- self.tag = tag
+ (self.tag_key, self.tag_value) = tag if tag else (None, None)
self.and_filter = and_filter
def to_config_dict(self):
@@ -501,11 +484,11 @@ class LifecycleFilter(BaseModel):
"predicate": {"type": "LifecyclePrefixPredicate", "prefix": self.prefix}
}
- elif self.tag:
+ elif self.tag_key:
return {
"predicate": {
"type": "LifecycleTagPredicate",
- "tag": {"key": self.tag.key, "value": self.tag.value},
+ "tag": {"key": self.tag_key, "value": self.tag_value},
}
}
@@ -529,12 +512,9 @@ class LifecycleAndFilter(BaseModel):
if self.prefix is not None:
data.append({"type": "LifecyclePrefixPredicate", "prefix": self.prefix})
- for tag in self.tags:
+ for key, value in self.tags.items():
data.append(
- {
- "type": "LifecycleTagPredicate",
- "tag": {"key": tag.key, "value": tag.value},
- }
+ {"type": "LifecycleTagPredicate", "tag": {"key": key, "value": value},}
)
return data
@@ -880,7 +860,7 @@ class FakeBucket(BaseModel):
and_filter = None
if rule["Filter"].get("And"):
filters += 1
- and_tags = []
+ and_tags = {}
if rule["Filter"]["And"].get("Tag"):
if not isinstance(rule["Filter"]["And"]["Tag"], list):
rule["Filter"]["And"]["Tag"] = [
@@ -888,7 +868,7 @@ class FakeBucket(BaseModel):
]
for t in rule["Filter"]["And"]["Tag"]:
- and_tags.append(FakeTag(t["Key"], t.get("Value", "")))
+ and_tags[t["Key"]] = t.get("Value", "")
try:
and_prefix = (
@@ -902,7 +882,7 @@ class FakeBucket(BaseModel):
filter_tag = None
if rule["Filter"].get("Tag"):
filters += 1
- filter_tag = FakeTag(
+ filter_tag = (
rule["Filter"]["Tag"]["Key"],
rule["Filter"]["Tag"].get("Value", ""),
)
@@ -989,16 +969,6 @@ class FakeBucket(BaseModel):
def delete_cors(self):
self.cors = []
- def set_tags(self, tagging):
- self.tags = tagging
-
- def delete_tags(self):
- self.tags = FakeTagging()
-
- @property
- def tagging(self):
- return self.tags
-
def set_logging(self, logging_config, bucket_backend):
if not logging_config:
self.logging = {}
@@ -1359,13 +1329,12 @@ class S3Backend(BaseBackend):
def get_key_tags(self, key):
return self.tagger.list_tags_for_resource(key.arn)
- def set_key_tags(self, key, tagging, key_name=None):
+ def set_key_tags(self, key, tags, key_name=None):
if key is None:
raise MissingKey(key_name)
self.tagger.delete_all_tags_for_resource(key.arn)
self.tagger.tag_resource(
- key.arn,
- [{"Key": tag.key, "Value": tag.value} for tag in tagging.tag_set.tags],
+ key.arn, [{"Key": key, "Value": value} for key, value in tags.items()],
)
return key
@@ -1373,15 +1342,11 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name)
return self.tagger.list_tags_for_resource(bucket.arn)
- def put_bucket_tagging(self, bucket_name, tagging):
- tag_keys = [tag.key for tag in tagging.tag_set.tags]
- if len(tag_keys) != len(set(tag_keys)):
- raise DuplicateTagKeys()
+ def put_bucket_tags(self, bucket_name, tags):
bucket = self.get_bucket(bucket_name)
self.tagger.delete_all_tags_for_resource(bucket.arn)
self.tagger.tag_resource(
- bucket.arn,
- [{"Key": tag.key, "Value": tag.value} for tag in tagging.tag_set.tags],
+ bucket.arn, [{"Key": key, "Value": value} for key, value in tags.items()],
)
def delete_bucket_tagging(self, bucket_name):
diff --git a/moto/s3/responses.py b/moto/s3/responses.py
index 4e3b9a67b..913b20861 100644
--- a/moto/s3/responses.py
+++ b/moto/s3/responses.py
@@ -24,6 +24,7 @@ from moto.s3bucket_path.utils import (
from .exceptions import (
BucketAlreadyExists,
+ DuplicateTagKeys,
S3ClientError,
MissingBucket,
MissingKey,
@@ -43,9 +44,6 @@ from .models import (
FakeGrant,
FakeAcl,
FakeKey,
- FakeTagging,
- FakeTagSet,
- FakeTag,
)
from .utils import (
bucket_name_from_url,
@@ -652,7 +650,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return ""
elif "tagging" in querystring:
tagging = self._bucket_tagging_from_xml(body)
- self.backend.put_bucket_tagging(bucket_name, tagging)
+ self.backend.put_bucket_tags(bucket_name, tagging)
return ""
elif "website" in querystring:
self.backend.set_bucket_website_configuration(bucket_name, body)
@@ -1361,55 +1359,45 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
return None
def _tagging_from_headers(self, headers):
+ tags = {}
if headers.get("x-amz-tagging"):
parsed_header = parse_qs(headers["x-amz-tagging"], keep_blank_values=True)
- tags = []
for tag in parsed_header.items():
- tags.append(FakeTag(tag[0], tag[1][0]))
-
- tag_set = FakeTagSet(tags)
- tagging = FakeTagging(tag_set)
- return tagging
- else:
- return FakeTagging()
+ tags[tag[0]] = tag[1][0]
+ return tags
def _tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml, force_list={"Tag": True})
- tags = []
+ tags = {}
for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]:
- tags.append(FakeTag(tag["Key"], tag["Value"]))
+ tags[tag["Key"]] = tag["Value"]
- tag_set = FakeTagSet(tags)
- tagging = FakeTagging(tag_set)
- return tagging
+ return tags
def _bucket_tagging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
- tags = []
+ tags = {}
# Optional if no tags are being sent:
if parsed_xml["Tagging"].get("TagSet"):
# If there is only 1 tag, then it's not a list:
if not isinstance(parsed_xml["Tagging"]["TagSet"]["Tag"], list):
- tags.append(
- FakeTag(
- parsed_xml["Tagging"]["TagSet"]["Tag"]["Key"],
- parsed_xml["Tagging"]["TagSet"]["Tag"]["Value"],
- )
- )
+ tags[parsed_xml["Tagging"]["TagSet"]["Tag"]["Key"]] = parsed_xml[
+ "Tagging"
+ ]["TagSet"]["Tag"]["Value"]
else:
for tag in parsed_xml["Tagging"]["TagSet"]["Tag"]:
- tags.append(FakeTag(tag["Key"], tag["Value"]))
+ if tag["Key"] in tags:
+ raise DuplicateTagKeys()
+ tags[tag["Key"]] = tag["Value"]
# Verify that "aws:" is not in the tags. If so, then this is a problem:
- for tag in tags:
- if tag.key.startswith("aws:"):
+ for key, _ in tags.items():
+ if key.startswith("aws:"):
raise NoSystemTags()
- tag_set = FakeTagSet(tags)
- tagging = FakeTagging(tag_set)
- return tagging
+ return tags
def _cors_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
@@ -1730,10 +1718,10 @@ S3_BUCKET_LIFECYCLE_CONFIGURATION = """
{% if rule.filter.prefix != None %}
{{ rule.filter.prefix }}
{% endif %}
- {% if rule.filter.tag %}
+ {% if rule.filter.tag_key %}
- {{ rule.filter.tag.key }}
- {{ rule.filter.tag.value }}
+ {{ rule.filter.tag_key }}
+ {{ rule.filter.tag_value }}
{% endif %}
{% if rule.filter.and_filter %}
@@ -1741,10 +1729,10 @@ S3_BUCKET_LIFECYCLE_CONFIGURATION = """
{% if rule.filter.and_filter.prefix != None %}
{{ rule.filter.and_filter.prefix }}
{% endif %}
- {% for tag in rule.filter.and_filter.tags %}
+ {% for key, value in rule.filter.and_filter.tags.items() %}
- {{ tag.key }}
- {{ tag.value }}
+ {{ key }}
+ {{ value }}
{% endfor %}
diff --git a/tests/test_config/test_config.py b/tests/test_config/test_config.py
index 1ffd52a2c..1bf39428e 100644
--- a/tests/test_config/test_config.py
+++ b/tests/test_config/test_config.py
@@ -11,6 +11,8 @@ from moto import mock_s3
from moto.config import mock_config
from moto.core import ACCOUNT_ID
+import sure # noqa
+
@mock_config
def test_put_configuration_recorder():
diff --git a/tests/test_s3/test_s3.py b/tests/test_s3/test_s3.py
index 4ddc160a8..e2acf32f2 100644
--- a/tests/test_s3/test_s3.py
+++ b/tests/test_s3/test_s3.py
@@ -4295,24 +4295,17 @@ def test_s3_config_dict():
FakeAcl,
FakeGrant,
FakeGrantee,
- FakeTag,
- FakeTagging,
- FakeTagSet,
OWNER,
)
# Without any buckets:
assert not s3_config_query.get_config_resource("some_bucket")
- tags = FakeTagging(
- FakeTagSet(
- [FakeTag("someTag", "someValue"), FakeTag("someOtherTag", "someOtherValue")]
- )
- )
+ tags = {"someTag": "someValue", "someOtherTag": "someOtherValue"}
# With 1 bucket in us-west-2:
s3_config_query.backends["global"].create_bucket("bucket1", "us-west-2")
- s3_config_query.backends["global"].put_bucket_tagging("bucket1", tags)
+ s3_config_query.backends["global"].put_bucket_tags("bucket1", tags)
# With a log bucket:
s3_config_query.backends["global"].create_bucket("logbucket", "us-west-2")