Service: ResilienceHub (#7456)
This commit is contained in:
parent
bfe1c12823
commit
3ef0f94fd5
@ -6216,6 +6216,66 @@
|
||||
- [ ] update_stream_processor
|
||||
</details>
|
||||
|
||||
## resiliencehub
|
||||
<details>
|
||||
<summary>18% implemented</summary>
|
||||
|
||||
- [ ] add_draft_app_version_resource_mappings
|
||||
- [ ] batch_update_recommendation_status
|
||||
- [X] create_app
|
||||
- [ ] create_app_version_app_component
|
||||
- [ ] create_app_version_resource
|
||||
- [ ] create_recommendation_template
|
||||
- [X] create_resiliency_policy
|
||||
- [ ] delete_app
|
||||
- [ ] delete_app_assessment
|
||||
- [ ] delete_app_input_source
|
||||
- [ ] delete_app_version_app_component
|
||||
- [ ] delete_app_version_resource
|
||||
- [ ] delete_recommendation_template
|
||||
- [ ] delete_resiliency_policy
|
||||
- [X] describe_app
|
||||
- [ ] describe_app_assessment
|
||||
- [ ] describe_app_version
|
||||
- [ ] describe_app_version_app_component
|
||||
- [ ] describe_app_version_resource
|
||||
- [ ] describe_app_version_resources_resolution_status
|
||||
- [ ] describe_app_version_template
|
||||
- [ ] describe_draft_app_version_resources_import_status
|
||||
- [X] describe_resiliency_policy
|
||||
- [ ] import_resources_to_draft_app_version
|
||||
- [ ] list_alarm_recommendations
|
||||
- [ ] list_app_assessment_compliance_drifts
|
||||
- [X] list_app_assessments
|
||||
- [ ] list_app_component_compliances
|
||||
- [ ] list_app_component_recommendations
|
||||
- [ ] list_app_input_sources
|
||||
- [ ] list_app_version_app_components
|
||||
- [ ] list_app_version_resource_mappings
|
||||
- [ ] list_app_version_resources
|
||||
- [ ] list_app_versions
|
||||
- [X] list_apps
|
||||
- [ ] list_recommendation_templates
|
||||
- [X] list_resiliency_policies
|
||||
- [ ] list_sop_recommendations
|
||||
- [ ] list_suggested_resiliency_policies
|
||||
- [X] list_tags_for_resource
|
||||
- [ ] list_test_recommendations
|
||||
- [ ] list_unsupported_app_version_resources
|
||||
- [ ] publish_app_version
|
||||
- [ ] put_draft_app_version_template
|
||||
- [ ] remove_draft_app_version_resource_mappings
|
||||
- [ ] resolve_app_version_resources
|
||||
- [ ] start_app_assessment
|
||||
- [X] tag_resource
|
||||
- [X] untag_resource
|
||||
- [ ] update_app
|
||||
- [ ] update_app_version
|
||||
- [ ] update_app_version_app_component
|
||||
- [ ] update_app_version_resource
|
||||
- [ ] update_resiliency_policy
|
||||
</details>
|
||||
|
||||
## resource-groups
|
||||
<details>
|
||||
<summary>61% implemented</summary>
|
||||
@ -8159,7 +8219,6 @@
|
||||
- rbin
|
||||
- redshift-serverless
|
||||
- repostspace
|
||||
- resiliencehub
|
||||
- resource-explorer-2
|
||||
- rolesanywhere
|
||||
- route53-recovery-cluster
|
||||
|
83
docs/docs/services/resiliencehub.rst
Normal file
83
docs/docs/services/resiliencehub.rst
Normal file
@ -0,0 +1,83 @@
|
||||
.. _implementedservice_resiliencehub:
|
||||
|
||||
.. |start-h3| raw:: html
|
||||
|
||||
<h3>
|
||||
|
||||
.. |end-h3| raw:: html
|
||||
|
||||
</h3>
|
||||
|
||||
=============
|
||||
resiliencehub
|
||||
=============
|
||||
|
||||
|start-h3| Implemented features for this service |end-h3|
|
||||
|
||||
- [ ] add_draft_app_version_resource_mappings
|
||||
- [ ] batch_update_recommendation_status
|
||||
- [X] create_app
|
||||
|
||||
The ClientToken-parameter is not yet implemented
|
||||
|
||||
|
||||
- [ ] create_app_version_app_component
|
||||
- [ ] create_app_version_resource
|
||||
- [ ] create_recommendation_template
|
||||
- [X] create_resiliency_policy
|
||||
|
||||
The ClientToken-parameter is not yet implemented
|
||||
|
||||
|
||||
- [ ] delete_app
|
||||
- [ ] delete_app_assessment
|
||||
- [ ] delete_app_input_source
|
||||
- [ ] delete_app_version_app_component
|
||||
- [ ] delete_app_version_resource
|
||||
- [ ] delete_recommendation_template
|
||||
- [ ] delete_resiliency_policy
|
||||
- [X] describe_app
|
||||
- [ ] describe_app_assessment
|
||||
- [ ] describe_app_version
|
||||
- [ ] describe_app_version_app_component
|
||||
- [ ] describe_app_version_resource
|
||||
- [ ] describe_app_version_resources_resolution_status
|
||||
- [ ] describe_app_version_template
|
||||
- [ ] describe_draft_app_version_resources_import_status
|
||||
- [X] describe_resiliency_policy
|
||||
- [ ] import_resources_to_draft_app_version
|
||||
- [ ] list_alarm_recommendations
|
||||
- [ ] list_app_assessment_compliance_drifts
|
||||
- [X] list_app_assessments
|
||||
- [ ] list_app_component_compliances
|
||||
- [ ] list_app_component_recommendations
|
||||
- [ ] list_app_input_sources
|
||||
- [ ] list_app_version_app_components
|
||||
- [ ] list_app_version_resource_mappings
|
||||
- [ ] list_app_version_resources
|
||||
- [ ] list_app_versions
|
||||
- [X] list_apps
|
||||
|
||||
The FromAssessmentTime/ToAssessmentTime-parameters are not yet implemented
|
||||
|
||||
|
||||
- [ ] list_recommendation_templates
|
||||
- [X] list_resiliency_policies
|
||||
- [ ] list_sop_recommendations
|
||||
- [ ] list_suggested_resiliency_policies
|
||||
- [X] list_tags_for_resource
|
||||
- [ ] list_test_recommendations
|
||||
- [ ] list_unsupported_app_version_resources
|
||||
- [ ] publish_app_version
|
||||
- [ ] put_draft_app_version_template
|
||||
- [ ] remove_draft_app_version_resource_mappings
|
||||
- [ ] resolve_app_version_resources
|
||||
- [ ] start_app_assessment
|
||||
- [X] tag_resource
|
||||
- [X] untag_resource
|
||||
- [ ] update_app
|
||||
- [ ] update_app_version
|
||||
- [ ] update_app_version_app_component
|
||||
- [ ] update_app_version_resource
|
||||
- [ ] update_resiliency_policy
|
||||
|
@ -131,6 +131,7 @@ backend_url_patterns = [
|
||||
("redshift", re.compile("https?://redshift\\.(.+)\\.amazonaws\\.com")),
|
||||
("redshiftdata", re.compile("https?://redshift-data\\.(.+)\\.amazonaws\\.com")),
|
||||
("rekognition", re.compile("https?://rekognition\\.(.+)\\.amazonaws\\.com")),
|
||||
("resiliencehub", re.compile("https?://resiliencehub\\.(.+)\\.amazonaws\\.com")),
|
||||
(
|
||||
"resourcegroups",
|
||||
re.compile("https?://resource-groups(-fips)?\\.(.+)\\.amazonaws.com"),
|
||||
@ -138,14 +139,11 @@ backend_url_patterns = [
|
||||
("resourcegroupstaggingapi", re.compile("https?://tagging\\.(.+)\\.amazonaws.com")),
|
||||
("robomaker", re.compile("https?://robomaker\\.(.+)\\.amazonaws\\.com")),
|
||||
("route53", re.compile("https?://route53(\\..+)?\\.amazonaws.com")),
|
||||
("route53domains", re.compile("https?://route53domains\\.(.+)\\.amazonaws\\.com")),
|
||||
(
|
||||
"route53resolver",
|
||||
re.compile("https?://route53resolver\\.(.+)\\.amazonaws\\.com"),
|
||||
),
|
||||
(
|
||||
"route53domains",
|
||||
re.compile("https?://route53domains\\.(.+)\\.amazonaws\\.com"),
|
||||
),
|
||||
("s3", re.compile("https?://s3(?!-control)(.*)\\.amazonaws.com")),
|
||||
(
|
||||
"s3",
|
||||
|
@ -104,6 +104,7 @@ if TYPE_CHECKING:
|
||||
from moto.redshift.models import RedshiftBackend
|
||||
from moto.redshiftdata.models import RedshiftDataAPIServiceBackend
|
||||
from moto.rekognition.models import RekognitionBackend
|
||||
from moto.resiliencehub.models import ResilienceHubBackend
|
||||
from moto.resourcegroups.models import ResourceGroupsBackend
|
||||
from moto.resourcegroupstaggingapi.models import ResourceGroupsTaggingAPIBackend
|
||||
from moto.robomaker.models import RoboMakerBackend
|
||||
@ -262,6 +263,7 @@ SERVICE_NAMES = Union[
|
||||
"Literal['redshift']",
|
||||
"Literal['redshift-data']",
|
||||
"Literal['rekognition']",
|
||||
"Literal['resiliencehub']",
|
||||
"Literal['resource-groups']",
|
||||
"Literal['resourcegroupstaggingapi']",
|
||||
"Literal['robomaker']",
|
||||
@ -498,6 +500,8 @@ def get_backend(name: "Literal['redshift-data']") -> "BackendDict[RedshiftDataAP
|
||||
@overload
|
||||
def get_backend(name: "Literal['rekognition']") -> "BackendDict[RekognitionBackend]": ...
|
||||
@overload
|
||||
def get_backend(name: "Literal['resiliencehub']") -> "BackendDict[ResilienceHubBackend]": ...
|
||||
@overload
|
||||
def get_backend(name: "Literal['resource-groups']") -> "BackendDict[ResourceGroupsBackend]": ...
|
||||
@overload
|
||||
def get_backend(name: "Literal['resourcegroupstaggingapi']") -> "BackendDict[ResourceGroupsTaggingAPIBackend]": ...
|
||||
|
1
moto/resiliencehub/__init__.py
Normal file
1
moto/resiliencehub/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .models import resiliencehub_backends # noqa: F401
|
22
moto/resiliencehub/exceptions.py
Normal file
22
moto/resiliencehub/exceptions.py
Normal file
@ -0,0 +1,22 @@
|
||||
"""Exceptions raised by the resiliencehub service."""
|
||||
from moto.core.exceptions import JsonRESTError
|
||||
|
||||
|
||||
class ResourceNotFound(JsonRESTError):
|
||||
def __init__(self, msg: str):
|
||||
super().__init__("ResourceNotFoundException", msg)
|
||||
|
||||
|
||||
class AppNotFound(ResourceNotFound):
|
||||
def __init__(self, arn: str):
|
||||
super().__init__(f"App not found for appArn {arn}")
|
||||
|
||||
|
||||
class ResiliencyPolicyNotFound(ResourceNotFound):
|
||||
def __init__(self, arn: str):
|
||||
super().__init__(f"ResiliencyPolicy {arn} not found")
|
||||
|
||||
|
||||
class ValidationException(JsonRESTError):
|
||||
def __init__(self, msg: str):
|
||||
super().__init__("ValidationException", msg)
|
212
moto/resiliencehub/models.py
Normal file
212
moto/resiliencehub/models.py
Normal file
@ -0,0 +1,212 @@
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from moto.core.base_backend import BackendDict, BaseBackend
|
||||
from moto.core.common_models import BaseModel
|
||||
from moto.core.utils import unix_time
|
||||
from moto.moto_api._internal import mock_random
|
||||
from moto.utilities.paginator import paginate
|
||||
from moto.utilities.tagging_service import TaggingService
|
||||
|
||||
from .exceptions import AppNotFound, ResiliencyPolicyNotFound
|
||||
|
||||
PAGINATION_MODEL = {
|
||||
"list_apps": {
|
||||
"input_token": "next_token",
|
||||
"limit_key": "max_results",
|
||||
"limit_default": 100,
|
||||
"unique_attribute": "arn",
|
||||
},
|
||||
"list_resiliency_policies": {
|
||||
"input_token": "next_token",
|
||||
"limit_key": "max_results",
|
||||
"limit_default": 100,
|
||||
"unique_attribute": "arn",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
class App(BaseModel):
|
||||
def __init__(
|
||||
self,
|
||||
backend: "ResilienceHubBackend",
|
||||
assessment_schedule: str,
|
||||
description: str,
|
||||
event_subscriptions: List[Dict[str, Any]],
|
||||
name: str,
|
||||
permission_model: Dict[str, Any],
|
||||
policy_arn: str,
|
||||
):
|
||||
self.backend = backend
|
||||
self.arn = f"arn:aws:resiliencehub:{backend.region_name}:{backend.account_id}:app/{mock_random.uuid4()}"
|
||||
self.assessment_schedule = assessment_schedule or "Disabled"
|
||||
self.compliance_status = "NotAssessed"
|
||||
self.description = description
|
||||
self.creation_time = unix_time()
|
||||
self.event_subscriptions = event_subscriptions
|
||||
self.name = name
|
||||
self.permission_model = permission_model
|
||||
self.policy_arn = policy_arn
|
||||
self.resilience_score = 0.0
|
||||
self.status = "Active"
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
resp = {
|
||||
"appArn": self.arn,
|
||||
"assessmentSchedule": self.assessment_schedule,
|
||||
"complianceStatus": self.compliance_status,
|
||||
"creationTime": self.creation_time,
|
||||
"name": self.name,
|
||||
"resilienceScore": self.resilience_score,
|
||||
"status": self.status,
|
||||
"tags": self.backend.list_tags_for_resource(self.arn),
|
||||
}
|
||||
if self.description is not None:
|
||||
resp["description"] = self.description
|
||||
if self.event_subscriptions:
|
||||
resp["eventSubscriptions"] = self.event_subscriptions
|
||||
if self.permission_model:
|
||||
resp["permissionModel"] = self.permission_model
|
||||
if self.policy_arn:
|
||||
resp["policyArn"] = self.policy_arn
|
||||
return resp
|
||||
|
||||
|
||||
class Policy(BaseModel):
|
||||
def __init__(
|
||||
self,
|
||||
backend: "ResilienceHubBackend",
|
||||
policy: Dict[str, Dict[str, int]],
|
||||
policy_name: str,
|
||||
data_location_constraint: str,
|
||||
policy_description: str,
|
||||
tier: str,
|
||||
):
|
||||
self.arn = f"arn:aws:resiliencehub:{backend.region_name}:{backend.account_id}:resiliency-policy/{mock_random.uuid4()}"
|
||||
self.backend = backend
|
||||
self.data_location_constraint = data_location_constraint
|
||||
self.creation_time = unix_time()
|
||||
self.policy = policy
|
||||
self.policy_description = policy_description
|
||||
self.policy_name = policy_name
|
||||
self.tier = tier
|
||||
|
||||
def to_json(self) -> Dict[str, Any]:
|
||||
resp = {
|
||||
"creationTime": self.creation_time,
|
||||
"policy": self.policy,
|
||||
"policyArn": self.arn,
|
||||
"policyName": self.policy_name,
|
||||
"tags": self.backend.list_tags_for_resource(self.arn),
|
||||
"tier": self.tier,
|
||||
}
|
||||
if self.data_location_constraint:
|
||||
resp["dataLocationConstraint"] = self.data_location_constraint
|
||||
if self.policy_description:
|
||||
resp["policyDescription"] = self.policy_description
|
||||
return resp
|
||||
|
||||
|
||||
class ResilienceHubBackend(BaseBackend):
|
||||
def __init__(self, region_name: str, account_id: str):
|
||||
super().__init__(region_name, account_id)
|
||||
self.apps: Dict[str, App] = dict()
|
||||
self.policies: Dict[str, Policy] = dict()
|
||||
self.tagger = TaggingService()
|
||||
|
||||
def create_app(
|
||||
self,
|
||||
assessment_schedule: str,
|
||||
description: str,
|
||||
event_subscriptions: List[Dict[str, Any]],
|
||||
name: str,
|
||||
permission_model: Dict[str, Any],
|
||||
policy_arn: str,
|
||||
tags: Dict[str, str],
|
||||
) -> App:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
"""
|
||||
app = App(
|
||||
backend=self,
|
||||
assessment_schedule=assessment_schedule,
|
||||
description=description,
|
||||
event_subscriptions=event_subscriptions,
|
||||
name=name,
|
||||
permission_model=permission_model,
|
||||
policy_arn=policy_arn,
|
||||
)
|
||||
self.apps[app.arn] = app
|
||||
self.tag_resource(app.arn, tags)
|
||||
return app
|
||||
|
||||
def create_resiliency_policy(
|
||||
self,
|
||||
data_location_constraint: str,
|
||||
policy: Dict[str, Any],
|
||||
policy_description: str,
|
||||
policy_name: str,
|
||||
tags: Dict[str, str],
|
||||
tier: str,
|
||||
) -> Policy:
|
||||
"""
|
||||
The ClientToken-parameter is not yet implemented
|
||||
"""
|
||||
pol = Policy(
|
||||
backend=self,
|
||||
data_location_constraint=data_location_constraint,
|
||||
policy=policy,
|
||||
policy_description=policy_description,
|
||||
policy_name=policy_name,
|
||||
tier=tier,
|
||||
)
|
||||
self.policies[pol.arn] = pol
|
||||
self.tag_resource(pol.arn, tags)
|
||||
return pol
|
||||
|
||||
@paginate(PAGINATION_MODEL)
|
||||
def list_apps(self, app_arn: str, name: str, reverse_order: bool) -> List[App]:
|
||||
"""
|
||||
The FromAssessmentTime/ToAssessmentTime-parameters are not yet implemented
|
||||
"""
|
||||
if name:
|
||||
app_summaries = [a for a in self.apps.values() if a.name == name]
|
||||
elif app_arn:
|
||||
app_summaries = [self.apps[app_arn]]
|
||||
else:
|
||||
app_summaries = list(self.apps.values())
|
||||
if reverse_order:
|
||||
app_summaries.reverse()
|
||||
return app_summaries
|
||||
|
||||
def list_app_assessments(self) -> List[str]:
|
||||
return []
|
||||
|
||||
def describe_app(self, app_arn: str) -> App:
|
||||
if app_arn not in self.apps:
|
||||
raise AppNotFound(app_arn)
|
||||
return self.apps[app_arn]
|
||||
|
||||
@paginate(pagination_model=PAGINATION_MODEL)
|
||||
def list_resiliency_policies(self, policy_name: str) -> List[Policy]:
|
||||
if policy_name:
|
||||
return [p for p in self.policies.values() if p.policy_name == policy_name]
|
||||
return list(self.policies.values())
|
||||
|
||||
def describe_resiliency_policy(self, policy_arn: str) -> Policy:
|
||||
if policy_arn not in self.policies:
|
||||
raise ResiliencyPolicyNotFound(policy_arn)
|
||||
return self.policies[policy_arn]
|
||||
|
||||
def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
|
||||
self.tagger.tag_resource(
|
||||
resource_arn, TaggingService.convert_dict_to_tags_input(tags)
|
||||
)
|
||||
|
||||
def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
|
||||
self.tagger.untag_resource_using_names(resource_arn, tag_keys)
|
||||
|
||||
def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
|
||||
return self.tagger.get_tag_dict_for_resource(resource_arn)
|
||||
|
||||
|
||||
resiliencehub_backends = BackendDict(ResilienceHubBackend, "resiliencehub")
|
159
moto/resiliencehub/responses.py
Normal file
159
moto/resiliencehub/responses.py
Normal file
@ -0,0 +1,159 @@
|
||||
import json
|
||||
from typing import Any
|
||||
from urllib.parse import unquote
|
||||
|
||||
from moto.core.responses import BaseResponse
|
||||
|
||||
from .exceptions import ValidationException
|
||||
from .models import ResilienceHubBackend, resiliencehub_backends
|
||||
|
||||
|
||||
class ResilienceHubResponse(BaseResponse):
|
||||
def tags(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return]
|
||||
self.setup_class(request, full_url, headers)
|
||||
if request.method == "GET":
|
||||
return self.list_tags_for_resource()
|
||||
if request.method == "POST":
|
||||
return self.tag_resource()
|
||||
if request.method == "DELETE":
|
||||
return self.untag_resource()
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(service_name="resiliencehub")
|
||||
|
||||
@property
|
||||
def resiliencehub_backend(self) -> ResilienceHubBackend:
|
||||
return resiliencehub_backends[self.current_account][self.region]
|
||||
|
||||
def create_app(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
assessment_schedule = params.get("assessmentSchedule")
|
||||
description = params.get("description")
|
||||
event_subscriptions = params.get("eventSubscriptions")
|
||||
name = params.get("name")
|
||||
permission_model = params.get("permissionModel")
|
||||
policy_arn = params.get("policyArn")
|
||||
tags = params.get("tags")
|
||||
app = self.resiliencehub_backend.create_app(
|
||||
assessment_schedule=assessment_schedule,
|
||||
description=description,
|
||||
event_subscriptions=event_subscriptions,
|
||||
name=name,
|
||||
permission_model=permission_model,
|
||||
policy_arn=policy_arn,
|
||||
tags=tags,
|
||||
)
|
||||
return json.dumps(dict(app=app.to_json()))
|
||||
|
||||
def create_resiliency_policy(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
data_location_constraint = params.get("dataLocationConstraint")
|
||||
policy = params.get("policy")
|
||||
policy_description = params.get("policyDescription")
|
||||
policy_name = params.get("policyName")
|
||||
tags = params.get("tags")
|
||||
tier = params.get("tier")
|
||||
|
||||
required_policy_types = ["Software", "Hardware", "AZ"]
|
||||
all_policy_types = required_policy_types + ["Region"]
|
||||
if any((p_type not in all_policy_types for p_type in policy.keys())):
|
||||
raise ValidationException(
|
||||
"1 validation error detected: Value at 'policy' failed to satisfy constraint: Map keys must satisfy constraint: [Member must satisfy enum value set: [Software, Hardware, Region, AZ]]"
|
||||
)
|
||||
for required_key in required_policy_types:
|
||||
if required_key not in policy.keys():
|
||||
raise ValidationException(
|
||||
f"FailureType {required_key.upper()} does not exist"
|
||||
)
|
||||
|
||||
policy = self.resiliencehub_backend.create_resiliency_policy(
|
||||
data_location_constraint=data_location_constraint,
|
||||
policy=policy,
|
||||
policy_description=policy_description,
|
||||
policy_name=policy_name,
|
||||
tags=tags,
|
||||
tier=tier,
|
||||
)
|
||||
return json.dumps(dict(policy=policy.to_json()))
|
||||
|
||||
def list_apps(self) -> str:
|
||||
params = self._get_params()
|
||||
app_arn = params.get("appArn")
|
||||
max_results = int(params.get("maxResults", 100))
|
||||
name = params.get("name")
|
||||
next_token = params.get("nextToken")
|
||||
reverse_order = params.get("reverseOrder") == "true"
|
||||
app_summaries, next_token = self.resiliencehub_backend.list_apps(
|
||||
app_arn=app_arn,
|
||||
max_results=max_results,
|
||||
name=name,
|
||||
next_token=next_token,
|
||||
reverse_order=reverse_order,
|
||||
)
|
||||
return json.dumps(
|
||||
dict(
|
||||
appSummaries=[a.to_json() for a in app_summaries], nextToken=next_token
|
||||
)
|
||||
)
|
||||
|
||||
def list_app_assessments(self) -> str:
|
||||
summaries = self.resiliencehub_backend.list_app_assessments()
|
||||
return json.dumps(dict(assessmentSummaries=summaries))
|
||||
|
||||
def describe_app(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
app_arn = params.get("appArn")
|
||||
app = self.resiliencehub_backend.describe_app(
|
||||
app_arn=app_arn,
|
||||
)
|
||||
return json.dumps(dict(app=app.to_json()))
|
||||
|
||||
def list_resiliency_policies(self) -> str:
|
||||
params = self._get_params()
|
||||
max_results = int(params.get("maxResults", 100))
|
||||
next_token = params.get("nextToken")
|
||||
policy_name = params.get("policyName")
|
||||
(
|
||||
resiliency_policies,
|
||||
next_token,
|
||||
) = self.resiliencehub_backend.list_resiliency_policies(
|
||||
max_results=max_results,
|
||||
next_token=next_token,
|
||||
policy_name=policy_name,
|
||||
)
|
||||
policies = [p.to_json() for p in resiliency_policies]
|
||||
return json.dumps(dict(nextToken=next_token, resiliencyPolicies=policies))
|
||||
|
||||
def describe_resiliency_policy(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
policy_arn = params.get("policyArn")
|
||||
policy = self.resiliencehub_backend.describe_resiliency_policy(
|
||||
policy_arn=policy_arn,
|
||||
)
|
||||
return json.dumps(dict(policy=policy.to_json()))
|
||||
|
||||
def tag_resource(self) -> str:
|
||||
params = json.loads(self.body)
|
||||
resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1])
|
||||
tags = params.get("tags")
|
||||
self.resiliencehub_backend.tag_resource(
|
||||
resource_arn=resource_arn,
|
||||
tags=tags,
|
||||
)
|
||||
return "{}"
|
||||
|
||||
def untag_resource(self) -> str:
|
||||
resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1])
|
||||
tag_keys = self.querystring.get("tagKeys", [])
|
||||
self.resiliencehub_backend.untag_resource(
|
||||
resource_arn=resource_arn,
|
||||
tag_keys=tag_keys,
|
||||
)
|
||||
return "{}"
|
||||
|
||||
def list_tags_for_resource(self) -> str:
|
||||
resource_arn = unquote(self.uri.split("/tags/")[-1])
|
||||
tags = self.resiliencehub_backend.list_tags_for_resource(
|
||||
resource_arn=resource_arn,
|
||||
)
|
||||
return json.dumps(dict(tags=tags))
|
21
moto/resiliencehub/urls.py
Normal file
21
moto/resiliencehub/urls.py
Normal file
@ -0,0 +1,21 @@
|
||||
"""resiliencehub base URL and path."""
|
||||
from .responses import ResilienceHubResponse
|
||||
|
||||
url_bases = [
|
||||
r"https?://resiliencehub\.(.+)\.amazonaws\.com",
|
||||
]
|
||||
|
||||
url_paths = {
|
||||
"{0}/create-app$": ResilienceHubResponse.dispatch,
|
||||
"{0}/create-resiliency-policy$": ResilienceHubResponse.dispatch,
|
||||
"{0}/describe-app$": ResilienceHubResponse.dispatch,
|
||||
"{0}/describe-resiliency-policy$": ResilienceHubResponse.dispatch,
|
||||
"{0}/list-apps$": ResilienceHubResponse.dispatch,
|
||||
"{0}/list-app-assessments$": ResilienceHubResponse.dispatch,
|
||||
"{0}/list-resiliency-policies$": ResilienceHubResponse.dispatch,
|
||||
"{0}/tags/.+$": ResilienceHubResponse.dispatch,
|
||||
"{0}/tags/(?P<arn_prefix>[^/]+)/(?P<workspace_id>[^/]+)$": ResilienceHubResponse.method_dispatch(
|
||||
ResilienceHubResponse.tags # type: ignore
|
||||
),
|
||||
"{0}/.*$": ResilienceHubResponse.dispatch,
|
||||
}
|
0
tests/test_resiliencehub/__init__.py
Normal file
0
tests/test_resiliencehub/__init__.py
Normal file
250
tests/test_resiliencehub/test_resiliencehub.py
Normal file
250
tests/test_resiliencehub/test_resiliencehub.py
Normal file
@ -0,0 +1,250 @@
|
||||
from uuid import uuid4
|
||||
|
||||
import boto3
|
||||
import pytest
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from moto import mock_aws
|
||||
|
||||
# See our Development Tips on writing tests for hints on how to write good tests:
|
||||
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
|
||||
|
||||
valid_resiliency_policy = {
|
||||
"Software": {"rpoInSecs": 1, "rtoInSecs": 1},
|
||||
"Hardware": {"rpoInSecs": 2, "rtoInSecs": 2},
|
||||
"AZ": {"rpoInSecs": 3, "rtoInSecs": 3},
|
||||
}
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_create_app():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-2")
|
||||
app = client.create_app(name="myapp")["app"]
|
||||
assert app["assessmentSchedule"] == "Disabled"
|
||||
assert app["complianceStatus"] == "NotAssessed"
|
||||
assert app["creationTime"]
|
||||
assert app["name"] == "myapp"
|
||||
assert app["status"] == "Active"
|
||||
assert app["tags"] == {}
|
||||
|
||||
app2 = client.describe_app(appArn=app["appArn"])["app"]
|
||||
assert app == app2
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_create_app_advanced():
|
||||
event_subs = [
|
||||
{
|
||||
"eventType": "ScheduledAssessmentFailure",
|
||||
"name": "some event",
|
||||
"snsTopicArn": "some sns arn",
|
||||
}
|
||||
]
|
||||
perm_model = {
|
||||
"crossAccountRoleArns": ["arn1", "arn2"],
|
||||
"invokerRoleName": "irn",
|
||||
"type": "Rolebased",
|
||||
}
|
||||
|
||||
client = boto3.client("resiliencehub", region_name="us-east-2")
|
||||
app = client.create_app(
|
||||
name="myapp",
|
||||
assessmentSchedule="Daily",
|
||||
description="some desc",
|
||||
eventSubscriptions=event_subs,
|
||||
permissionModel=perm_model,
|
||||
policyArn="some policy arn",
|
||||
)["app"]
|
||||
assert app["assessmentSchedule"] == "Daily"
|
||||
assert app["description"] == "some desc"
|
||||
assert app["eventSubscriptions"] == event_subs
|
||||
assert app["permissionModel"] == perm_model
|
||||
assert app["policyArn"] == "some policy arn"
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_describe_unknown_app():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-1")
|
||||
app_arn = f"arn:aws:resiliencehub:us-east-1:486285699788:app/{str(uuid4())}"
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.describe_app(appArn=app_arn)
|
||||
err = exc.value.response["Error"]
|
||||
assert err["Code"] == "ResourceNotFoundException"
|
||||
assert err["Message"] == f"App not found for appArn {app_arn}"
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_create_resilience_policy():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-1")
|
||||
|
||||
policy = client.create_resiliency_policy(
|
||||
policy=valid_resiliency_policy, policyName="polname", tier="NonCritical"
|
||||
)["policy"]
|
||||
assert policy["creationTime"]
|
||||
assert policy["policy"] == valid_resiliency_policy
|
||||
assert policy["policyName"] == "polname"
|
||||
assert policy["tags"] == {}
|
||||
assert policy["tier"] == "NonCritical"
|
||||
|
||||
policy2 = client.describe_resiliency_policy(policyArn=policy["policyArn"])["policy"]
|
||||
assert policy == policy2
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_create_resilience_policy_advanced():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-1")
|
||||
|
||||
policy = client.create_resiliency_policy(
|
||||
dataLocationConstraint="AnyLocation",
|
||||
policy=valid_resiliency_policy,
|
||||
policyName="polname",
|
||||
policyDescription="test policy",
|
||||
tier="NonCritical",
|
||||
)["policy"]
|
||||
assert policy["dataLocationConstraint"] == "AnyLocation"
|
||||
assert policy["policyDescription"] == "test policy"
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_create_resilience_policy_missing_types():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-1")
|
||||
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.create_resiliency_policy(
|
||||
policy={"Software": {"rpoInSecs": 1, "rtoInSecs": 1}},
|
||||
policyName="polname",
|
||||
tier="NonCritical",
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert err["Message"] == "FailureType HARDWARE does not exist"
|
||||
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.create_resiliency_policy(
|
||||
policy={
|
||||
"Software": {"rpoInSecs": 1, "rtoInSecs": 1},
|
||||
"Hardware": {"rpoInSecs": 2, "rtoInSecs": 2},
|
||||
},
|
||||
policyName="polname",
|
||||
tier="NonCritical",
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert err["Message"] == "FailureType AZ does not exist"
|
||||
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.create_resiliency_policy(
|
||||
policy={"Hardware": {"rpoInSecs": 1, "rtoInSecs": 1}},
|
||||
policyName="polname",
|
||||
tier="NonCritical",
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
err["Message"] == "FailureType SOFTWARE does not exist"
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_create_resilience_policy_with_unknown_policy_type():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-1")
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.create_resiliency_policy(
|
||||
policy={
|
||||
"st": {"rpoInSecs": 1, "rtoInSecs": 1},
|
||||
},
|
||||
policyName="polname",
|
||||
tier="NonCritical",
|
||||
)
|
||||
err = exc.value.response["Error"]
|
||||
assert err["Code"] == "ValidationException"
|
||||
assert (
|
||||
"Member must satisfy enum value set: [Software, Hardware, Region, AZ]"
|
||||
in err["Message"]
|
||||
)
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_list_apps():
|
||||
client = boto3.client("resiliencehub", region_name="ap-southeast-1")
|
||||
assert client.list_apps()["appSummaries"] == []
|
||||
|
||||
for i in range(5):
|
||||
arn = client.create_app(name=f"app_{i}")["app"]["appArn"]
|
||||
|
||||
app_2 = client.list_apps(name="app_2")["appSummaries"][0]
|
||||
assert app_2["name"] == "app_2"
|
||||
|
||||
app_4 = client.list_apps(appArn=arn)["appSummaries"][0]
|
||||
assert app_4["name"] == "app_4"
|
||||
|
||||
all_apps = client.list_apps()["appSummaries"]
|
||||
assert len(all_apps) == 5
|
||||
assert [a["name"] for a in all_apps] == [
|
||||
"app_0",
|
||||
"app_1",
|
||||
"app_2",
|
||||
"app_3",
|
||||
"app_4",
|
||||
]
|
||||
|
||||
all_apps = client.list_apps(reverseOrder=True)["appSummaries"]
|
||||
assert len(all_apps) == 5
|
||||
assert [a["name"] for a in all_apps] == [
|
||||
"app_4",
|
||||
"app_3",
|
||||
"app_2",
|
||||
"app_1",
|
||||
"app_0",
|
||||
]
|
||||
|
||||
page1 = client.list_apps(maxResults=2)
|
||||
assert len(page1["appSummaries"]) == 2
|
||||
|
||||
page2 = client.list_apps(maxResults=2, nextToken=page1["nextToken"])
|
||||
assert len(page2["appSummaries"]) == 2
|
||||
|
||||
full_page = client.list_apps(nextToken=page1["nextToken"])
|
||||
assert len(full_page["appSummaries"]) == 3
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_list_app_assessments():
|
||||
client = boto3.client("resiliencehub", region_name="ap-southeast-1")
|
||||
assert client.list_app_assessments()["assessmentSummaries"] == []
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_list_resiliency_policies():
|
||||
client = boto3.client("resiliencehub", region_name="ap-southeast-1")
|
||||
assert client.list_resiliency_policies()["resiliencyPolicies"] == []
|
||||
|
||||
for i in range(5):
|
||||
client.create_resiliency_policy(
|
||||
policy=valid_resiliency_policy, policyName=f"policy_{i}", tier="NonCritical"
|
||||
)["policy"]
|
||||
|
||||
assert len(client.list_resiliency_policies()["resiliencyPolicies"]) == 5
|
||||
|
||||
policy2 = client.list_resiliency_policies(policyName="policy_2")[
|
||||
"resiliencyPolicies"
|
||||
][0]
|
||||
policy2["policyName"] == "policy_2"
|
||||
|
||||
page1 = client.list_resiliency_policies(maxResults=2)
|
||||
assert len(page1["resiliencyPolicies"]) == 2
|
||||
|
||||
page2 = client.list_resiliency_policies(maxResults=2, nextToken=page1["nextToken"])
|
||||
assert len(page2["resiliencyPolicies"]) == 2
|
||||
assert page2["nextToken"]
|
||||
|
||||
page_full = client.list_resiliency_policies(nextToken=page1["nextToken"])
|
||||
assert len(page_full["resiliencyPolicies"]) == 3
|
||||
assert "nextToken" not in page_full
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_describe_unknown_resiliency_policy():
|
||||
client = boto3.client("resiliencehub", region_name="eu-west-1")
|
||||
with pytest.raises(ClientError) as exc:
|
||||
client.describe_resiliency_policy(policyArn="unknownarn")
|
||||
err = exc.value.response["Error"]
|
||||
assert err["Code"] == "ResourceNotFoundException"
|
||||
assert err["Message"] == "ResiliencyPolicy unknownarn not found"
|
56
tests/test_resiliencehub/test_resiliencyhub_tagging.py
Normal file
56
tests/test_resiliencehub/test_resiliencyhub_tagging.py
Normal file
@ -0,0 +1,56 @@
|
||||
import boto3
|
||||
|
||||
from moto import mock_aws
|
||||
|
||||
from .test_resiliencehub import valid_resiliency_policy
|
||||
|
||||
# See our Development Tips on writing tests for hints on how to write good tests:
|
||||
# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_app_tagging():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-2")
|
||||
app = client.create_app(name="myapp", tags={"k": "v"})["app"]
|
||||
arn = app["appArn"]
|
||||
assert app["tags"] == {"k": "v"}
|
||||
|
||||
assert app == client.describe_app(appArn=arn)["app"]
|
||||
|
||||
client.tag_resource(resourceArn=arn, tags={"k2": "v2"})
|
||||
|
||||
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {
|
||||
"k": "v",
|
||||
"k2": "v2",
|
||||
}
|
||||
|
||||
client.untag_resource(resourceArn=arn, tagKeys=["k"])
|
||||
|
||||
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k2": "v2"}
|
||||
|
||||
|
||||
@mock_aws
|
||||
def test_policy_tagging():
|
||||
client = boto3.client("resiliencehub", region_name="us-east-2")
|
||||
policy = client.create_resiliency_policy(
|
||||
policy=valid_resiliency_policy,
|
||||
policyName="polname",
|
||||
tier="NonCritical",
|
||||
tags={"k": "v"},
|
||||
)["policy"]
|
||||
arn = policy["policyArn"]
|
||||
|
||||
assert policy["tags"] == {"k": "v"}
|
||||
|
||||
assert policy == client.describe_resiliency_policy(policyArn=arn)["policy"]
|
||||
|
||||
client.tag_resource(resourceArn=arn, tags={"k2": "v2"})
|
||||
|
||||
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {
|
||||
"k": "v",
|
||||
"k2": "v2",
|
||||
}
|
||||
|
||||
client.untag_resource(resourceArn=arn, tagKeys=["k"])
|
||||
|
||||
assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k2": "v2"}
|
Loading…
Reference in New Issue
Block a user