Use TaggingService for S3 Buckets

This commit is contained in:
Bert Blommers 2020-03-31 11:10:38 +01:00
parent 7419f527d4
commit 6dd6686afc
3 changed files with 32 additions and 11 deletions

View File

@ -22,6 +22,7 @@ import six
from bisect import insort
from moto.core import ACCOUNT_ID, BaseBackend, BaseModel
from moto.core.utils import iso_8601_datetime_with_milliseconds, rfc_1123_datetime
from moto.utilities.tagging_service import TaggingService
from .exceptions import (
BucketAlreadyExists,
MissingBucket,
@ -787,7 +788,6 @@ class FakeBucket(BaseModel):
self.policy = None
self.website_configuration = None
self.acl = get_canned_acl("private")
self.tags = FakeTagging()
self.cors = []
self.logging = {}
self.notification_configuration = None
@ -1085,6 +1085,10 @@ class FakeBucket(BaseModel):
def set_acl(self, acl):
self.acl = acl
@property
def arn(self):
return "arn:aws:s3:::{}".format(self.name)
@property
def physical_resource_id(self):
return self.name
@ -1110,7 +1114,7 @@ class FakeBucket(BaseModel):
int(time.mktime(self.creation_date.timetuple()))
), # PY2 and 3 compatible
"configurationItemMD5Hash": "",
"arn": "arn:aws:s3:::{}".format(self.name),
"arn": self.arn,
"resourceType": "AWS::S3::Bucket",
"resourceId": self.name,
"resourceName": self.name,
@ -1119,7 +1123,7 @@ class FakeBucket(BaseModel):
"resourceCreationTime": str(self.creation_date),
"relatedEvents": [],
"relationships": [],
"tags": {tag.key: tag.value for tag in self.tagging.tag_set.tags},
"tags": s3_backend.tagger.get_tag_dict_for_resource(self.arn),
"configuration": {
"name": self.name,
"owner": {"id": OWNER},
@ -1181,6 +1185,7 @@ class S3Backend(BaseBackend):
def __init__(self):
self.buckets = {}
self.account_public_access_block = None
self.tagger = TaggingService()
def create_bucket(self, bucket_name, region_name):
if bucket_name in self.buckets:
@ -1357,16 +1362,24 @@ class S3Backend(BaseBackend):
key.set_tagging(tagging)
return key
def get_bucket_tags(self, bucket_name):
bucket = self.get_bucket(bucket_name)
return self.tagger.list_tags_for_resource(bucket.arn)
def put_bucket_tagging(self, bucket_name, tagging):
tag_keys = [tag.key for tag in tagging.tag_set.tags]
if len(tag_keys) != len(set(tag_keys)):
raise DuplicateTagKeys()
bucket = self.get_bucket(bucket_name)
bucket.set_tags(tagging)
self.tagger.delete_all_tags_for_resource(bucket.arn)
self.tagger.tag_resource(
bucket.arn,
[{"Key": tag.key, "Value": tag.value} for tag in tagging.tag_set.tags],
)
def delete_bucket_tagging(self, bucket_name):
bucket = self.get_bucket(bucket_name)
bucket.delete_tags()
self.tagger.delete_all_tags_for_resource(bucket.arn)
def put_bucket_cors(self, bucket_name, cors_rules):
bucket = self.get_bucket(bucket_name)

View File

@ -378,13 +378,13 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
template = self.response_template(S3_OBJECT_ACL_RESPONSE)
return template.render(obj=bucket)
elif "tagging" in querystring:
bucket = self.backend.get_bucket(bucket_name)
tags = self.backend.get_bucket_tags(bucket_name)["Tags"]
# "Special Error" if no tags:
if len(bucket.tagging.tag_set.tags) == 0:
if len(tags) == 0:
template = self.response_template(S3_NO_BUCKET_TAGGING)
return 404, {}, template.render(bucket_name=bucket_name)
template = self.response_template(S3_BUCKET_TAGGING_RESPONSE)
return template.render(bucket=bucket)
return template.render(tags=tags)
elif "logging" in querystring:
bucket = self.backend.get_bucket(bucket_name)
if not bucket.logging:
@ -1929,7 +1929,7 @@ S3_OBJECT_TAGGING_RESPONSE = """\
S3_BUCKET_TAGGING_RESPONSE = """<?xml version="1.0" encoding="UTF-8"?>
<Tagging>
<TagSet>
{% for tag in bucket.tagging.tag_set.tags %}
{% for tag in tags %}
<Tag>
<Key>{{ tag.key }}</Key>
<Value>{{ tag.value }}</Value>

View File

@ -5,15 +5,23 @@ class TaggingService:
self.valueName = valueName
self.tags = {}
def get_tag_dict_for_resource(self, arn):
result = {}
if self.has_tags(arn):
for k, v in self.tags[arn].items():
result[k] = v
return result
def list_tags_for_resource(self, arn):
result = []
if arn in self.tags:
if self.has_tags(arn):
for k, v in self.tags[arn].items():
result.append({self.keyName: k, self.valueName: v})
return {self.tagName: result}
def delete_all_tags_for_resource(self, arn):
del self.tags[arn]
if self.has_tags(arn):
del self.tags[arn]
def has_tags(self, arn):
return arn in self.tags