Techdebt: Replace sure with assertions in ResourceGroupStagging API (#6693)

This commit is contained in:
kbalk 2023-08-18 03:47:44 -04:00 committed by GitHub
parent 1106e64ed7
commit 6436dcb224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 71 additions and 76 deletions

View File

@ -1,8 +1,9 @@
from uuid import uuid4
import boto3
from moto import mock_glue, mock_resourcegroupstaggingapi
from moto.core import DEFAULT_ACCOUNT_ID
from uuid import uuid4
@mock_glue
@ -14,7 +15,7 @@ def test_glue_jobs():
job_name = glue.create_job(
Name=str(uuid4()),
Role="test_role",
Command=dict(Name="test_command"),
Command={"Name": "test_command"},
Tags={tag_key: tag_val},
)["Name"]
job_arn = f"arn:aws:glue:us-west-1:{DEFAULT_ACCOUNT_ID}:job/{job_name}"

View File

@ -1,6 +1,8 @@
import boto3
import json
import sure # noqa # pylint: disable=unused-import
import boto3
from botocore.client import ClientError
from moto import mock_ec2
from moto import mock_elbv2
from moto import mock_kms
@ -11,7 +13,6 @@ from moto import mock_lambda
from moto import mock_iam
from moto import mock_cloudformation
from moto import mock_ecs
from botocore.client import ClientError
from tests import EXAMPLE_AMI_ID, EXAMPLE_AMI_ID2
@ -47,15 +48,15 @@ def test_get_resources_cloudformation():
rgta_client = boto3.client("resourcegroupstaggingapi", region_name="us-east-1")
resp = rgta_client.get_resources(TagFilters=[{"Key": "tag", "Values": ["one"]}])
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(stack_one)
assert len(resp["ResourceTagMappingList"]) == 1
assert stack_one in resp["ResourceTagMappingList"][0]["ResourceARN"]
resp = rgta_client.get_resources(
TagFilters=[{"Key": "tag", "Values": ["one", "three"]}]
)
resp["ResourceTagMappingList"].should.have.length_of(2)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(stack_one)
resp["ResourceTagMappingList"][1]["ResourceARN"].should.contain(stack_three)
assert len(resp["ResourceTagMappingList"]) == 2
assert stack_one in resp["ResourceTagMappingList"][0]["ResourceARN"]
assert stack_three in resp["ResourceTagMappingList"][1]["ResourceARN"]
kms_client = boto3.client("kms", region_name="us-east-1")
kms_client.create_key(
@ -63,16 +64,16 @@ def test_get_resources_cloudformation():
)
resp = rgta_client.get_resources(TagFilters=[{"Key": "tag", "Values": ["two"]}])
resp["ResourceTagMappingList"].should.have.length_of(2)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(stack_two)
resp["ResourceTagMappingList"][1]["ResourceARN"].should.contain("kms")
assert len(resp["ResourceTagMappingList"]) == 2
assert stack_two in resp["ResourceTagMappingList"][0]["ResourceARN"]
assert "kms" in resp["ResourceTagMappingList"][1]["ResourceARN"]
resp = rgta_client.get_resources(
ResourceTypeFilters=["cloudformation:stack"],
TagFilters=[{"Key": "tag", "Values": ["two"]}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(stack_two)
assert len(resp["ResourceTagMappingList"]) == 1
assert stack_two in resp["ResourceTagMappingList"][0]["ResourceARN"]
@mock_ecs
@ -99,8 +100,8 @@ def test_get_resources_ecs():
rgta_client = boto3.client("resourcegroupstaggingapi", region_name="us-east-1")
resp = rgta_client.get_resources(TagFilters=[{"Key": "tag", "Values": ["a tag"]}])
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(cluster_one)
assert len(resp["ResourceTagMappingList"]) == 1
assert cluster_one in resp["ResourceTagMappingList"][0]["ResourceARN"]
# ecs:service
service_one = (
@ -124,25 +125,25 @@ def test_get_resources_ecs():
)
resp = rgta_client.get_resources(TagFilters=[{"Key": "tag", "Values": ["a tag"]}])
resp["ResourceTagMappingList"].should.have.length_of(2)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(service_one)
resp["ResourceTagMappingList"][1]["ResourceARN"].should.contain(cluster_one)
assert len(resp["ResourceTagMappingList"]) == 2
assert service_one in resp["ResourceTagMappingList"][0]["ResourceARN"]
assert cluster_one in resp["ResourceTagMappingList"][1]["ResourceARN"]
resp = rgta_client.get_resources(
ResourceTypeFilters=["ecs:cluster"],
TagFilters=[{"Key": "tag", "Values": ["b tag"]}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should_not.contain(service_two)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(cluster_two)
assert len(resp["ResourceTagMappingList"]) == 1
assert service_two not in resp["ResourceTagMappingList"][0]["ResourceARN"]
assert cluster_two in resp["ResourceTagMappingList"][0]["ResourceARN"]
resp = rgta_client.get_resources(
ResourceTypeFilters=["ecs:service"],
TagFilters=[{"Key": "tag", "Values": ["b tag"]}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(service_two)
resp["ResourceTagMappingList"][0]["ResourceARN"].should_not.contain(cluster_two)
assert len(resp["ResourceTagMappingList"]) == 1
assert service_two in resp["ResourceTagMappingList"][0]["ResourceARN"]
assert cluster_two not in resp["ResourceTagMappingList"][0]["ResourceARN"]
# ecs:task
resp = client.register_task_definition(
@ -208,18 +209,18 @@ def test_get_resources_ecs():
)
resp = rgta_client.get_resources(TagFilters=[{"Key": "tag", "Values": ["b tag"]}])
resp["ResourceTagMappingList"].should.have.length_of(3)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(service_two)
resp["ResourceTagMappingList"][1]["ResourceARN"].should.contain(cluster_two)
resp["ResourceTagMappingList"][2]["ResourceARN"].should.contain(task_two)
assert len(resp["ResourceTagMappingList"]) == 3
assert service_two in resp["ResourceTagMappingList"][0]["ResourceARN"]
assert cluster_two in resp["ResourceTagMappingList"][1]["ResourceARN"]
assert task_two in resp["ResourceTagMappingList"][2]["ResourceARN"]
resp = rgta_client.get_resources(
ResourceTypeFilters=["ecs:task"],
TagFilters=[{"Key": "tag", "Values": ["a tag"]}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain(task_one)
resp["ResourceTagMappingList"][0]["ResourceARN"].should_not.contain(task_two)
assert len(resp["ResourceTagMappingList"]) == 1
assert task_one in resp["ResourceTagMappingList"][0]["ResourceARN"]
assert task_two not in resp["ResourceTagMappingList"][0]["ResourceARN"]
@mock_rds
@ -255,24 +256,24 @@ def test_get_resources_ec2():
rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1")
resp = rtapi.get_resources()
# Check we have 1 entry for Instance, 1 Entry for AMI
resp["ResourceTagMappingList"].should.have.length_of(2)
assert len(resp["ResourceTagMappingList"]) == 2
# 1 Entry for AMI
resp = rtapi.get_resources(ResourceTypeFilters=["ec2:image"])
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain("image/")
assert len(resp["ResourceTagMappingList"]) == 1
assert "image/" in resp["ResourceTagMappingList"][0]["ResourceARN"]
# As were iterating the same data, this rules out that the test above was a fluke
resp = rtapi.get_resources(ResourceTypeFilters=["ec2:instance"])
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain("instance/")
assert len(resp["ResourceTagMappingList"]) == 1
assert "instance/" in resp["ResourceTagMappingList"][0]["ResourceARN"]
# Basic test of tag filters
resp = rtapi.get_resources(
TagFilters=[{"Key": "MY_TAG1", "Values": ["MY_VALUE1", "some_other_value"]}]
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain("instance/")
assert len(resp["ResourceTagMappingList"]) == 1
assert "instance/" in resp["ResourceTagMappingList"][0]["ResourceARN"]
@mock_ec2
@ -284,8 +285,8 @@ def test_get_resources_ec2_vpc():
def assert_response(resp):
results = resp.get("ResourceTagMappingList", [])
results.should.have.length_of(1)
vpc.id.should.be.within(results[0]["ResourceARN"])
assert len(results) == 1
assert vpc.id in results[0]["ResourceARN"]
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-1")
resp = rtapi.get_resources(ResourceTypeFilters=["ec2"])
@ -324,9 +325,9 @@ def test_get_tag_keys_ec2():
rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1")
resp = rtapi.get_tag_keys()
resp["TagKeys"].should.contain("MY_TAG1")
resp["TagKeys"].should.contain("MY_TAG2")
resp["TagKeys"].should.contain("MY_TAG3")
assert "MY_TAG1" in resp["TagKeys"]
assert "MY_TAG2" in resp["TagKeys"]
assert "MY_TAG3" in resp["TagKeys"]
# TODO test pagenation
@ -378,8 +379,8 @@ def test_get_tag_values_ec2():
rtapi = boto3.client("resourcegroupstaggingapi", region_name="eu-central-1")
resp = rtapi.get_tag_values(Key="MY_TAG1")
resp["TagValues"].should.contain("MY_VALUE1")
resp["TagValues"].should.contain("MY_VALUE4")
assert "MY_VALUE1" in resp["TagValues"]
assert "MY_VALUE4" in resp["TagValues"]
@mock_ec2
@ -434,17 +435,17 @@ def test_get_many_resources():
ResourceTypeFilters=["elasticloadbalancing:loadbalancer"]
)
resp["ResourceTagMappingList"].should.have.length_of(2)
resp["ResourceTagMappingList"][0]["ResourceARN"].should.contain("loadbalancer/")
assert len(resp["ResourceTagMappingList"]) == 2
assert "loadbalancer/" in resp["ResourceTagMappingList"][0]["ResourceARN"]
resp = rtapi.get_resources(
ResourceTypeFilters=["elasticloadbalancing:loadbalancer"],
TagFilters=[{"Key": "key_name"}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["Tags"].should.contain(
{"Key": "key_name", "Value": "a_value"}
)
assert len(resp["ResourceTagMappingList"]) == 1
assert {"Key": "key_name", "Value": "a_value"} in resp["ResourceTagMappingList"][0][
"Tags"
]
# TODO test pagination
@ -479,17 +480,15 @@ def test_get_resources_target_group():
# Basic test
resp = rtapi.get_resources(ResourceTypeFilters=["elasticloadbalancing:targetgroup"])
resp["ResourceTagMappingList"].should.have.length_of(2)
assert len(resp["ResourceTagMappingList"]) == 2
# Test tag filtering
resp = rtapi.get_resources(
ResourceTypeFilters=["elasticloadbalancing:targetgroup"],
TagFilters=[{"Key": "Test", "Values": ["1"]}],
)
resp["ResourceTagMappingList"].should.have.length_of(1)
resp["ResourceTagMappingList"][0]["Tags"].should.contain(
{"Key": "Test", "Value": "1"}
)
assert len(resp["ResourceTagMappingList"]) == 1
assert {"Key": "Test", "Value": "1"} in resp["ResourceTagMappingList"][0]["Tags"]
@mock_s3
@ -519,7 +518,7 @@ def test_get_resources_s3():
for resource in resp["ResourceTagMappingList"]:
response_keys.remove(resource["Tags"][0]["Key"])
response_keys.should.have.length_of(2)
assert len(response_keys) == 2
resp = rtapi.get_resources(
ResourcesPerPage=2, PaginationToken=resp["PaginationToken"]
@ -527,7 +526,7 @@ def test_get_resources_s3():
for resource in resp["ResourceTagMappingList"]:
response_keys.remove(resource["Tags"][0]["Key"])
response_keys.should.have.length_of(0)
assert len(response_keys) == 0
@mock_ec2
@ -584,9 +583,9 @@ def test_multiple_tag_filters():
{"Key": "MY_TAG2", "Values": ["MY_SHARED_VALUE"]},
]
).get("ResourceTagMappingList", [])
results.should.have.length_of(1)
instance_1_id.should.be.within(results[0]["ResourceARN"])
instance_2_id.shouldnt.be.within(results[0]["ResourceARN"])
assert len(results) == 1
assert instance_1_id in results[0]["ResourceARN"]
assert instance_2_id not in results[0]["ResourceARN"]
@mock_rds
@ -600,7 +599,7 @@ def test_get_resources_rds():
DBInstanceIdentifier=f"db-instance-{i}",
Engine="postgres",
DBInstanceClass="db.m1.small",
CopyTagsToSnapshot=True if i else False,
CopyTagsToSnapshot=bool(i),
Tags=[{"Key": "test", "Value": f"value-{i}"}] if i else [],
).get("DBInstance")
snapshot = client.create_db_snapshot(
@ -613,13 +612,13 @@ def test_get_resources_rds():
def assert_response(response, expected_count, resource_type=None):
results = response.get("ResourceTagMappingList", [])
results.should.have.length_of(expected_count)
assert len(results) == expected_count
for item in results:
arn = item["ResourceARN"]
arn.should.be.within(resources_tagged)
arn.should_not.be.within(resources_untagged)
assert arn in resources_tagged
assert arn not in resources_untagged
if resource_type:
sure.this(f":{resource_type}:").should.be.within(arn)
assert f":{resource_type}:" in arn
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2")
resp = rtapi.get_resources(ResourceTypeFilters=["rds"])
@ -702,9 +701,9 @@ def test_get_resources_lambda():
for item in results:
resultArns.append(item["ResourceARN"])
for arn in resultArns:
arn.should.be.within(expected_arns)
assert arn in expected_arns
for arn in expected_arns:
arn.should.be.within(resultArns)
assert arn in resultArns
rtapi = boto3.client("resourcegroupstaggingapi", region_name="us-west-2")
resp = rtapi.get_resources(ResourceTypeFilters=["lambda"])

View File

@ -1,11 +1,6 @@
import sure # noqa # pylint: disable=unused-import
"""Test the different server responses."""
import moto.server as server
"""
Test the different server responses
"""
def test_resourcegroupstaggingapi_list():
backend = server.create_backend_app("resourcegroupstaggingapi")