Signer: Add tagging methods (#6959)

This commit is contained in:
Bert Blommers 2023-10-28 08:10:49 +00:00 committed by GitHub
parent a485eb45b8
commit 42ecdccea7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 93 additions and 35 deletions

View File

@ -6915,7 +6915,7 @@
## signer
<details>
<summary>21% implemented</summary>
<summary>36% implemented</summary>
- [ ] add_profile_permission
- [X] cancel_signing_profile
@ -6927,15 +6927,15 @@
- [ ] list_signing_jobs
- [X] list_signing_platforms
- [ ] list_signing_profiles
- [ ] list_tags_for_resource
- [X] list_tags_for_resource
- [X] put_signing_profile
- [ ] remove_profile_permission
- [ ] revoke_signature
- [ ] revoke_signing_profile
- [ ] sign_payload
- [ ] start_signing_job
- [ ] tag_resource
- [ ] untag_resource
- [X] tag_resource
- [X] untag_resource
</details>
## sns

View File

@ -41,10 +41,10 @@ signer
- [ ] list_signing_profiles
- [ ] list_tags_for_resource
- [X] list_tags_for_resource
- [X] put_signing_profile
The following parameters are not yet implemented: SigningMaterial, Overrides, SigningParamaters
The following parameters are not yet implemented: Overrides, SigningParameters
- [ ] remove_profile_permission
@ -52,6 +52,6 @@ signer
- [ ] revoke_signing_profile
- [ ] sign_payload
- [ ] start_signing_job
- [ ] tag_resource
- [ ] untag_resource
- [X] tag_resource
- [X] untag_resource

View File

@ -2,17 +2,17 @@ from typing import Any, Dict, List, Optional
from moto.core import BaseBackend, BackendDict, BaseModel
from moto.moto_api._internal import mock_random
from moto.utilities.tagging_service import TaggingService
class SigningProfile(BaseModel):
def __init__(
self,
account_id: str,
region: str,
backend: "SignerBackend",
name: str,
platform_id: str,
signing_material: Dict[str, str],
signature_validity_period: Optional[Dict[str, Any]],
tags: Dict[str, str],
):
self.name = name
self.platform_id = platform_id
@ -20,12 +20,13 @@ class SigningProfile(BaseModel):
"value": 135,
"type": "MONTHS",
}
self.tags = tags
self.backend = backend
self.status = "Active"
self.arn = f"arn:aws:signer:{region}:{account_id}:/signing-profiles/{name}"
self.arn = f"arn:aws:signer:{backend.region_name}:{backend.account_id}:/signing-profiles/{name}"
self.profile_version = mock_random.get_random_hex(10)
self.profile_version_arn = f"{self.arn}/{self.profile_version}"
self.signing_material = signing_material
def cancel(self) -> None:
self.status = "Canceled"
@ -43,7 +44,7 @@ class SigningProfile(BaseModel):
"profileName": self.name,
"platformId": self.platform_id,
"signatureValidityPeriod": self.signature_validity_period,
"signingMaterial": {},
"signingMaterial": self.signing_material,
"platformDisplayName": next(
(
p["displayName"]
@ -54,8 +55,9 @@ class SigningProfile(BaseModel):
),
}
)
if self.tags:
small.update({"tags": self.tags})
tags = self.backend.list_tags_for_resource(self.arn)
if tags:
small.update({"tags": tags})
return small
@ -160,6 +162,7 @@ class SignerBackend(BaseBackend):
def __init__(self, region_name: str, account_id: str):
super().__init__(region_name, account_id)
self.signing_profiles: Dict[str, SigningProfile] = dict()
self.tagger = TaggingService()
def cancel_signing_profile(self, profile_name: str) -> None:
self.signing_profiles[profile_name].cancel()
@ -172,20 +175,21 @@ class SignerBackend(BaseBackend):
profile_name: str,
signature_validity_period: Optional[Dict[str, Any]],
platform_id: str,
signing_material: Dict[str, str],
tags: Dict[str, str],
) -> SigningProfile:
"""
The following parameters are not yet implemented: SigningMaterial, Overrides, SigningParamaters
The following parameters are not yet implemented: Overrides, SigningParameters
"""
profile = SigningProfile(
account_id=self.account_id,
region=self.region_name,
backend=self,
name=profile_name,
platform_id=platform_id,
signing_material=signing_material,
signature_validity_period=signature_validity_period,
tags=tags,
)
self.signing_profiles[profile_name] = profile
self.tag_resource(profile.arn, tags)
return profile
def list_signing_platforms(self) -> List[Dict[str, Any]]:
@ -194,6 +198,17 @@ class SignerBackend(BaseBackend):
"""
return SignerBackend.platforms
def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
return self.tagger.get_tag_dict_for_resource(resource_arn)
def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
self.tagger.tag_resource(
resource_arn, TaggingService.convert_dict_to_tags_input(tags)
)
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
# Using the lambda-regions
# boto3.Session().get_available_regions("signer") still returns an empty list

View File

@ -1,11 +1,13 @@
"""Handles incoming signer requests, invokes methods, returns responses."""
import json
from typing import Any
from urllib.parse import unquote
from moto.core.responses import BaseResponse
from .models import signer_backends, SignerBackend
class signerResponse(BaseResponse):
class SignerResponse(BaseResponse):
def __init__(self) -> None:
super().__init__(service_name="signer")
@ -30,10 +32,12 @@ class signerResponse(BaseResponse):
signature_validity_period = params.get("signatureValidityPeriod")
platform_id = params.get("platformId")
tags = params.get("tags")
signing_material = params.get("signingMaterial")
profile = self.signer_backend.put_signing_profile(
profile_name=profile_name,
signature_validity_period=signature_validity_period,
platform_id=platform_id,
signing_material=signing_material,
tags=tags,
)
return json.dumps(profile.to_dict(full=False))
@ -41,3 +45,30 @@ class signerResponse(BaseResponse):
def list_signing_platforms(self) -> str:
platforms = self.signer_backend.list_signing_platforms()
return json.dumps(dict(platforms=platforms))
def list_tags_for_resource(self) -> str:
resource_arn = unquote(self.path.split("/tags/")[-1])
return json.dumps(
{"tags": self.signer_backend.list_tags_for_resource(resource_arn)}
)
def tag_resource(self) -> str:
resource_arn = unquote(self.path.split("/tags/")[-1])
tags = self._get_param("tags")
self.signer_backend.tag_resource(resource_arn, tags)
return "{}"
def untag_resource(self) -> str:
resource_arn = unquote(self.path.split("/tags/")[-1])
tag_keys = self.querystring.get("tagKeys")
self.signer_backend.untag_resource(resource_arn, tag_keys) # type: ignore
return "{}"
def tags(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return]
self.setup_class(request, full_url, headers)
if request.method == "GET":
return self.list_tags_for_resource()
if request.method == "POST":
return self.tag_resource()
if request.method == "DELETE":
return self.untag_resource()

View File

@ -1,15 +1,16 @@
"""signer base URL and path."""
from .responses import signerResponse
from .responses import SignerResponse
url_bases = [
r"https?://signer\.(.+)\.amazonaws\.com",
]
response = signerResponse()
url_paths = {
"{0}/signing-profiles/(?P<profile_name>[^/]+)$": response.dispatch,
"{0}/signing-platforms$": response.dispatch,
"{0}/tags/(?P<profile_arn>[^/]+)$": SignerResponse.dispatch,
"{0}/tags/(?P<arn_prefix>[^/]+)/signing-profiles/(?P<profile_name>[^/]+)$": SignerResponse.method_dispatch(
SignerResponse.tags # type: ignore
),
"{0}/signing-profiles/(?P<profile_name>[^/]+)$": SignerResponse.dispatch,
"{0}/signing-platforms$": SignerResponse.dispatch,
}

View File

@ -20,9 +20,7 @@ def test_put_signing_profile():
@mock_signer
def test_get_signing_profile():
client = boto3.client("signer", region_name="eu-west-1")
resp = client.put_signing_profile(
profileName="prof1", platformId="AWSLambda-SHA384-ECDSA"
)
client.put_signing_profile(profileName="prof1", platformId="AWSLambda-SHA384-ECDSA")
resp = client.get_signing_profile(profileName="prof1")
@ -38,25 +36,38 @@ def test_get_signing_profile():
@mock_signer
def test_get_signing_profile__with_args():
client = boto3.client("signer", region_name="eu-west-1")
resp = client.put_signing_profile(
profile_arn = client.put_signing_profile(
profileName="prof1",
platformId="AWSLambda-SHA384-ECDSA",
signatureValidityPeriod={"type": "DAYS", "value": 10},
signingMaterial={"certificateArn": "some arn"},
tags={"k1": "v1", "k2": "v2"},
)
)["arn"]
resp = client.get_signing_profile(profileName="prof1")
assert resp["signatureValidityPeriod"] == {"type": "DAYS", "value": 10}
assert resp["tags"] == {"k1": "v1", "k2": "v2"}
assert resp["signingMaterial"] == {"certificateArn": "some arn"}
tag_list = client.list_tags_for_resource(resourceArn=profile_arn)["tags"]
assert tag_list == {"k1": "v1", "k2": "v2"}
client.tag_resource(resourceArn=profile_arn, tags={"k3": "v3"})
tag_list = client.list_tags_for_resource(resourceArn=profile_arn)["tags"]
assert tag_list == {"k1": "v1", "k2": "v2", "k3": "v3"}
client.untag_resource(resourceArn=profile_arn, tagKeys=["k2"])
tag_list = client.list_tags_for_resource(resourceArn=profile_arn)["tags"]
assert tag_list == {"k1": "v1", "k3": "v3"}
@mock_signer
def test_cancel_signing_profile():
client = boto3.client("signer", region_name="eu-west-1")
resp = client.put_signing_profile(
profileName="prof1", platformId="AWSLambda-SHA384-ECDSA"
)
client.put_signing_profile(profileName="prof1", platformId="AWSLambda-SHA384-ECDSA")
client.cancel_signing_profile(profileName="prof1")