feature: tag functions in eks(#5203) (#5215)

This commit is contained in:
Michael Merrill 2022-06-14 07:08:04 -04:00 committed by GitHub
parent 631e887b5e
commit 36e4856015
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 140 additions and 4 deletions

View File

@ -49,11 +49,11 @@ eks
- [X] list_fargate_profiles
- [ ] list_identity_provider_configs
- [X] list_nodegroups
- [ ] list_tags_for_resource
- [X] list_tags_for_resource
- [ ] list_updates
- [ ] register_cluster
- [ ] tag_resource
- [ ] untag_resource
- [X] tag_resource
- [X] untag_resource
- [ ] update_addon
- [ ] update_cluster_config
- [ ] update_cluster_version

View File

@ -631,6 +631,78 @@ class EKSBackend(BaseBackend):
cluster.nodegroup_count -= 1
return result
def tag_resource(self, resource_arn, tags):
"""
This function currently will tag an EKS cluster only. It does not tag a managed node group
"""
try:
cluster = next(
self.clusters[x]
for x in self.clusters
if self.clusters[x].arn == resource_arn
)
except StopIteration:
# Cluster does not exist.
raise ResourceNotFoundException(
clusterName=None,
nodegroupName=None,
fargateProfileName=None,
addonName=None,
message="An error occurred (NotFoundException) when calling the TagResource operation: Resource was not found",
)
cluster.tags.update(tags)
return ""
def untag_resource(self, resource_arn, tag_keys):
"""
This function currently will remove tags on an EKS cluster only. It does not remove tags from a managed node group
"""
if not isinstance(tag_keys, list):
tag_keys = [tag_keys]
try:
cluster = next(
self.clusters[x]
for x in self.clusters
if self.clusters[x].arn == resource_arn
)
except StopIteration:
# Cluster does not exist.
raise ResourceNotFoundException(
clusterName=None,
nodegroupName=None,
fargateProfileName=None,
addonName=None,
message="An error occurred (NotFoundException) when calling the UntagResource operation: Resource was not found",
)
for name in tag_keys:
if name in cluster.tags:
del cluster.tags[name]
return ""
def list_tags_for_resource(self, resource_arn):
"""
This function currently will list tags on an EKS cluster only. It does not list tags from a managed node group
"""
try:
cluster = next(
self.clusters[x]
for x in self.clusters
if self.clusters[x].arn == resource_arn
)
except StopIteration:
# Cluster does not exist.
raise ResourceNotFoundException(
clusterName=None,
nodegroupName=None,
fargateProfileName=None,
addonName=None,
message="An error occurred (NotFoundException) when calling the ListTagsForResource operation: Resource was not found",
)
return cluster.tags
def list_clusters(self, max_results, next_token):
return paginated_list(self.clusters.keys(), max_results, next_token)

View File

@ -1,5 +1,5 @@
import json
from urllib.parse import unquote
from moto.core.responses import BaseResponse
from .models import eks_backends
@ -190,3 +190,35 @@ class EKSResponse(BaseResponse):
)
return 200, {}, json.dumps({"nodegroup": dict(nodegroup)})
def tags(self, request, full_url, headers):
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.list_tags_for_resource()
if request.method == "POST":
return self.tag_resource()
if request.method == "DELETE":
return self.untag_resource()
def tag_resource(self):
self.eks_backend.tag_resource(
self._extract_arn_from_path(), self._get_param("tags")
)
return 200, {}, ""
def untag_resource(self):
self.eks_backend.untag_resource(
self._extract_arn_from_path(), self._get_param("tagKeys")
)
return 200, {}, ""
def list_tags_for_resource(self):
tags = self.eks_backend.list_tags_for_resource(self._extract_arn_from_path())
return 200, {}, json.dumps({"tags": tags})
def _extract_arn_from_path(self):
# /tags/arn_that_may_contain_a_slash
path = unquote(self.path)
return "/".join(path.split("/")[2:])

View File

@ -15,4 +15,5 @@ url_paths = {
"{0}/clusters/(?P<name>[^/]+)/node-groups/(?P<nodegroupName>[^/]+)$": response.dispatch,
"{0}/clusters/(?P<name>[^/]+)/fargate-profiles$": response.dispatch,
"{0}/clusters/(?P<name>[^/]+)/fargate-profiles/(?P<fargateProfileName>[^/]+)$": response.dispatch,
"{0}/tags/(?P<resourceArn>.+)$": response.tags,
}

View File

@ -186,6 +186,8 @@ def NodegroupBuilder(ClusterBuilder):
# in the list at initialization, which means the mock
# decorator must be used manually in this one case.
###
@mock_eks
def test_list_clusters_returns_empty_by_default():
client = boto3.client(SERVICE, region_name=REGION)
@ -195,6 +197,35 @@ def test_list_clusters_returns_empty_by_default():
result.should.equal([])
@mock_eks
def test_list_tags_returns_empty_by_default(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SINGLE)
cluster_arn = generated_test_data.cluster_describe_output[ClusterAttributes.ARN]
result = client.list_tags_for_resource(resourceArn=cluster_arn)
assert len(result["tags"]) == 0
@mock_eks
def test_list_tags_returns_all(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SINGLE)
cluster_arn = generated_test_data.cluster_describe_output[ClusterAttributes.ARN]
client.tag_resource(resourceArn=cluster_arn, tags={"key1": "val1", "key2": "val2"})
result = client.list_tags_for_resource(resourceArn=cluster_arn)
assert len(result["tags"]) == 2
result.should.have.key("tags").equals({"key1": "val1", "key2": "val2"})
@mock_eks
def test_list_tags_returns_all_after_delete(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SINGLE)
cluster_arn = generated_test_data.cluster_describe_output[ClusterAttributes.ARN]
client.tag_resource(resourceArn=cluster_arn, tags={"key1": "val1", "key2": "val2"})
client.untag_resource(resourceArn=cluster_arn, tagKeys=["key1"])
result = client.list_tags_for_resource(resourceArn=cluster_arn)
assert len(result["tags"]) == 1
result.should.have.key("tags").equals({"key2": "val2"})
@mock_eks
def test_list_clusters_returns_sorted_cluster_names(ClusterBuilder):
client, generated_test_data = ClusterBuilder(BatchCountSize.SMALL)