diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md
index 3e91aaee6..6d28d4849 100644
--- a/IMPLEMENTATION_COVERAGE.md
+++ b/IMPLEMENTATION_COVERAGE.md
@@ -6216,6 +6216,66 @@
- [ ] update_stream_processor
+## resiliencehub
+
+18% implemented
+
+- [ ] add_draft_app_version_resource_mappings
+- [ ] batch_update_recommendation_status
+- [X] create_app
+- [ ] create_app_version_app_component
+- [ ] create_app_version_resource
+- [ ] create_recommendation_template
+- [X] create_resiliency_policy
+- [ ] delete_app
+- [ ] delete_app_assessment
+- [ ] delete_app_input_source
+- [ ] delete_app_version_app_component
+- [ ] delete_app_version_resource
+- [ ] delete_recommendation_template
+- [ ] delete_resiliency_policy
+- [X] describe_app
+- [ ] describe_app_assessment
+- [ ] describe_app_version
+- [ ] describe_app_version_app_component
+- [ ] describe_app_version_resource
+- [ ] describe_app_version_resources_resolution_status
+- [ ] describe_app_version_template
+- [ ] describe_draft_app_version_resources_import_status
+- [X] describe_resiliency_policy
+- [ ] import_resources_to_draft_app_version
+- [ ] list_alarm_recommendations
+- [ ] list_app_assessment_compliance_drifts
+- [X] list_app_assessments
+- [ ] list_app_component_compliances
+- [ ] list_app_component_recommendations
+- [ ] list_app_input_sources
+- [ ] list_app_version_app_components
+- [ ] list_app_version_resource_mappings
+- [ ] list_app_version_resources
+- [ ] list_app_versions
+- [X] list_apps
+- [ ] list_recommendation_templates
+- [X] list_resiliency_policies
+- [ ] list_sop_recommendations
+- [ ] list_suggested_resiliency_policies
+- [X] list_tags_for_resource
+- [ ] list_test_recommendations
+- [ ] list_unsupported_app_version_resources
+- [ ] publish_app_version
+- [ ] put_draft_app_version_template
+- [ ] remove_draft_app_version_resource_mappings
+- [ ] resolve_app_version_resources
+- [ ] start_app_assessment
+- [X] tag_resource
+- [X] untag_resource
+- [ ] update_app
+- [ ] update_app_version
+- [ ] update_app_version_app_component
+- [ ] update_app_version_resource
+- [ ] update_resiliency_policy
+
+
## resource-groups
61% implemented
@@ -8159,7 +8219,6 @@
- rbin
- redshift-serverless
- repostspace
-- resiliencehub
- resource-explorer-2
- rolesanywhere
- route53-recovery-cluster
diff --git a/docs/docs/services/resiliencehub.rst b/docs/docs/services/resiliencehub.rst
new file mode 100644
index 000000000..b326081f7
--- /dev/null
+++ b/docs/docs/services/resiliencehub.rst
@@ -0,0 +1,83 @@
+.. _implementedservice_resiliencehub:
+
+.. |start-h3| raw:: html
+
+
+
+.. |end-h3| raw:: html
+
+
+
+=============
+resiliencehub
+=============
+
+|start-h3| Implemented features for this service |end-h3|
+
+- [ ] add_draft_app_version_resource_mappings
+- [ ] batch_update_recommendation_status
+- [X] create_app
+
+ The ClientToken-parameter is not yet implemented
+
+
+- [ ] create_app_version_app_component
+- [ ] create_app_version_resource
+- [ ] create_recommendation_template
+- [X] create_resiliency_policy
+
+ The ClientToken-parameter is not yet implemented
+
+
+- [ ] delete_app
+- [ ] delete_app_assessment
+- [ ] delete_app_input_source
+- [ ] delete_app_version_app_component
+- [ ] delete_app_version_resource
+- [ ] delete_recommendation_template
+- [ ] delete_resiliency_policy
+- [X] describe_app
+- [ ] describe_app_assessment
+- [ ] describe_app_version
+- [ ] describe_app_version_app_component
+- [ ] describe_app_version_resource
+- [ ] describe_app_version_resources_resolution_status
+- [ ] describe_app_version_template
+- [ ] describe_draft_app_version_resources_import_status
+- [X] describe_resiliency_policy
+- [ ] import_resources_to_draft_app_version
+- [ ] list_alarm_recommendations
+- [ ] list_app_assessment_compliance_drifts
+- [X] list_app_assessments
+- [ ] list_app_component_compliances
+- [ ] list_app_component_recommendations
+- [ ] list_app_input_sources
+- [ ] list_app_version_app_components
+- [ ] list_app_version_resource_mappings
+- [ ] list_app_version_resources
+- [ ] list_app_versions
+- [X] list_apps
+
+ The FromAssessmentTime/ToAssessmentTime-parameters are not yet implemented
+
+
+- [ ] list_recommendation_templates
+- [X] list_resiliency_policies
+- [ ] list_sop_recommendations
+- [ ] list_suggested_resiliency_policies
+- [X] list_tags_for_resource
+- [ ] list_test_recommendations
+- [ ] list_unsupported_app_version_resources
+- [ ] publish_app_version
+- [ ] put_draft_app_version_template
+- [ ] remove_draft_app_version_resource_mappings
+- [ ] resolve_app_version_resources
+- [ ] start_app_assessment
+- [X] tag_resource
+- [X] untag_resource
+- [ ] update_app
+- [ ] update_app_version
+- [ ] update_app_version_app_component
+- [ ] update_app_version_resource
+- [ ] update_resiliency_policy
+
diff --git a/moto/backend_index.py b/moto/backend_index.py
index 5e0d4455b..fd17dfc70 100644
--- a/moto/backend_index.py
+++ b/moto/backend_index.py
@@ -131,6 +131,7 @@ backend_url_patterns = [
("redshift", re.compile("https?://redshift\\.(.+)\\.amazonaws\\.com")),
("redshiftdata", re.compile("https?://redshift-data\\.(.+)\\.amazonaws\\.com")),
("rekognition", re.compile("https?://rekognition\\.(.+)\\.amazonaws\\.com")),
+ ("resiliencehub", re.compile("https?://resiliencehub\\.(.+)\\.amazonaws\\.com")),
(
"resourcegroups",
re.compile("https?://resource-groups(-fips)?\\.(.+)\\.amazonaws.com"),
@@ -138,14 +139,11 @@ backend_url_patterns = [
("resourcegroupstaggingapi", re.compile("https?://tagging\\.(.+)\\.amazonaws.com")),
("robomaker", re.compile("https?://robomaker\\.(.+)\\.amazonaws\\.com")),
("route53", re.compile("https?://route53(\\..+)?\\.amazonaws.com")),
+ ("route53domains", re.compile("https?://route53domains\\.(.+)\\.amazonaws\\.com")),
(
"route53resolver",
re.compile("https?://route53resolver\\.(.+)\\.amazonaws\\.com"),
),
- (
- "route53domains",
- re.compile("https?://route53domains\\.(.+)\\.amazonaws\\.com"),
- ),
("s3", re.compile("https?://s3(?!-control)(.*)\\.amazonaws.com")),
(
"s3",
diff --git a/moto/backends.py b/moto/backends.py
index e7d636f1e..64179c190 100644
--- a/moto/backends.py
+++ b/moto/backends.py
@@ -104,6 +104,7 @@ if TYPE_CHECKING:
from moto.redshift.models import RedshiftBackend
from moto.redshiftdata.models import RedshiftDataAPIServiceBackend
from moto.rekognition.models import RekognitionBackend
+ from moto.resiliencehub.models import ResilienceHubBackend
from moto.resourcegroups.models import ResourceGroupsBackend
from moto.resourcegroupstaggingapi.models import ResourceGroupsTaggingAPIBackend
from moto.robomaker.models import RoboMakerBackend
@@ -262,6 +263,7 @@ SERVICE_NAMES = Union[
"Literal['redshift']",
"Literal['redshift-data']",
"Literal['rekognition']",
+ "Literal['resiliencehub']",
"Literal['resource-groups']",
"Literal['resourcegroupstaggingapi']",
"Literal['robomaker']",
@@ -498,6 +500,8 @@ def get_backend(name: "Literal['redshift-data']") -> "BackendDict[RedshiftDataAP
@overload
def get_backend(name: "Literal['rekognition']") -> "BackendDict[RekognitionBackend]": ...
@overload
+def get_backend(name: "Literal['resiliencehub']") -> "BackendDict[ResilienceHubBackend]": ...
+@overload
def get_backend(name: "Literal['resource-groups']") -> "BackendDict[ResourceGroupsBackend]": ...
@overload
def get_backend(name: "Literal['resourcegroupstaggingapi']") -> "BackendDict[ResourceGroupsTaggingAPIBackend]": ...
diff --git a/moto/resiliencehub/__init__.py b/moto/resiliencehub/__init__.py
new file mode 100644
index 000000000..45490c614
--- /dev/null
+++ b/moto/resiliencehub/__init__.py
@@ -0,0 +1 @@
+from .models import resiliencehub_backends # noqa: F401
diff --git a/moto/resiliencehub/exceptions.py b/moto/resiliencehub/exceptions.py
new file mode 100644
index 000000000..88c400049
--- /dev/null
+++ b/moto/resiliencehub/exceptions.py
@@ -0,0 +1,22 @@
+"""Exceptions raised by the resiliencehub service."""
+from moto.core.exceptions import JsonRESTError
+
+
+class ResourceNotFound(JsonRESTError):
+ def __init__(self, msg: str):
+ super().__init__("ResourceNotFoundException", msg)
+
+
+class AppNotFound(ResourceNotFound):
+ def __init__(self, arn: str):
+ super().__init__(f"App not found for appArn {arn}")
+
+
+class ResiliencyPolicyNotFound(ResourceNotFound):
+ def __init__(self, arn: str):
+ super().__init__(f"ResiliencyPolicy {arn} not found")
+
+
+class ValidationException(JsonRESTError):
+ def __init__(self, msg: str):
+ super().__init__("ValidationException", msg)
diff --git a/moto/resiliencehub/models.py b/moto/resiliencehub/models.py
new file mode 100644
index 000000000..173ed2437
--- /dev/null
+++ b/moto/resiliencehub/models.py
@@ -0,0 +1,212 @@
+from typing import Any, Dict, List
+
+from moto.core.base_backend import BackendDict, BaseBackend
+from moto.core.common_models import BaseModel
+from moto.core.utils import unix_time
+from moto.moto_api._internal import mock_random
+from moto.utilities.paginator import paginate
+from moto.utilities.tagging_service import TaggingService
+
+from .exceptions import AppNotFound, ResiliencyPolicyNotFound
+
+PAGINATION_MODEL = {
+ "list_apps": {
+ "input_token": "next_token",
+ "limit_key": "max_results",
+ "limit_default": 100,
+ "unique_attribute": "arn",
+ },
+ "list_resiliency_policies": {
+ "input_token": "next_token",
+ "limit_key": "max_results",
+ "limit_default": 100,
+ "unique_attribute": "arn",
+ },
+}
+
+
+class App(BaseModel):
+ def __init__(
+ self,
+ backend: "ResilienceHubBackend",
+ assessment_schedule: str,
+ description: str,
+ event_subscriptions: List[Dict[str, Any]],
+ name: str,
+ permission_model: Dict[str, Any],
+ policy_arn: str,
+ ):
+ self.backend = backend
+ self.arn = f"arn:aws:resiliencehub:{backend.region_name}:{backend.account_id}:app/{mock_random.uuid4()}"
+ self.assessment_schedule = assessment_schedule or "Disabled"
+ self.compliance_status = "NotAssessed"
+ self.description = description
+ self.creation_time = unix_time()
+ self.event_subscriptions = event_subscriptions
+ self.name = name
+ self.permission_model = permission_model
+ self.policy_arn = policy_arn
+ self.resilience_score = 0.0
+ self.status = "Active"
+
+ def to_json(self) -> Dict[str, Any]:
+ resp = {
+ "appArn": self.arn,
+ "assessmentSchedule": self.assessment_schedule,
+ "complianceStatus": self.compliance_status,
+ "creationTime": self.creation_time,
+ "name": self.name,
+ "resilienceScore": self.resilience_score,
+ "status": self.status,
+ "tags": self.backend.list_tags_for_resource(self.arn),
+ }
+ if self.description is not None:
+ resp["description"] = self.description
+ if self.event_subscriptions:
+ resp["eventSubscriptions"] = self.event_subscriptions
+ if self.permission_model:
+ resp["permissionModel"] = self.permission_model
+ if self.policy_arn:
+ resp["policyArn"] = self.policy_arn
+ return resp
+
+
+class Policy(BaseModel):
+ def __init__(
+ self,
+ backend: "ResilienceHubBackend",
+ policy: Dict[str, Dict[str, int]],
+ policy_name: str,
+ data_location_constraint: str,
+ policy_description: str,
+ tier: str,
+ ):
+ self.arn = f"arn:aws:resiliencehub:{backend.region_name}:{backend.account_id}:resiliency-policy/{mock_random.uuid4()}"
+ self.backend = backend
+ self.data_location_constraint = data_location_constraint
+ self.creation_time = unix_time()
+ self.policy = policy
+ self.policy_description = policy_description
+ self.policy_name = policy_name
+ self.tier = tier
+
+ def to_json(self) -> Dict[str, Any]:
+ resp = {
+ "creationTime": self.creation_time,
+ "policy": self.policy,
+ "policyArn": self.arn,
+ "policyName": self.policy_name,
+ "tags": self.backend.list_tags_for_resource(self.arn),
+ "tier": self.tier,
+ }
+ if self.data_location_constraint:
+ resp["dataLocationConstraint"] = self.data_location_constraint
+ if self.policy_description:
+ resp["policyDescription"] = self.policy_description
+ return resp
+
+
+class ResilienceHubBackend(BaseBackend):
+ def __init__(self, region_name: str, account_id: str):
+ super().__init__(region_name, account_id)
+ self.apps: Dict[str, App] = dict()
+ self.policies: Dict[str, Policy] = dict()
+ self.tagger = TaggingService()
+
+ def create_app(
+ self,
+ assessment_schedule: str,
+ description: str,
+ event_subscriptions: List[Dict[str, Any]],
+ name: str,
+ permission_model: Dict[str, Any],
+ policy_arn: str,
+ tags: Dict[str, str],
+ ) -> App:
+ """
+ The ClientToken-parameter is not yet implemented
+ """
+ app = App(
+ backend=self,
+ assessment_schedule=assessment_schedule,
+ description=description,
+ event_subscriptions=event_subscriptions,
+ name=name,
+ permission_model=permission_model,
+ policy_arn=policy_arn,
+ )
+ self.apps[app.arn] = app
+ self.tag_resource(app.arn, tags)
+ return app
+
+ def create_resiliency_policy(
+ self,
+ data_location_constraint: str,
+ policy: Dict[str, Any],
+ policy_description: str,
+ policy_name: str,
+ tags: Dict[str, str],
+ tier: str,
+ ) -> Policy:
+ """
+ The ClientToken-parameter is not yet implemented
+ """
+ pol = Policy(
+ backend=self,
+ data_location_constraint=data_location_constraint,
+ policy=policy,
+ policy_description=policy_description,
+ policy_name=policy_name,
+ tier=tier,
+ )
+ self.policies[pol.arn] = pol
+ self.tag_resource(pol.arn, tags)
+ return pol
+
+ @paginate(PAGINATION_MODEL)
+ def list_apps(self, app_arn: str, name: str, reverse_order: bool) -> List[App]:
+ """
+ The FromAssessmentTime/ToAssessmentTime-parameters are not yet implemented
+ """
+ if name:
+ app_summaries = [a for a in self.apps.values() if a.name == name]
+ elif app_arn:
+ app_summaries = [self.apps[app_arn]]
+ else:
+ app_summaries = list(self.apps.values())
+ if reverse_order:
+ app_summaries.reverse()
+ return app_summaries
+
+ def list_app_assessments(self) -> List[str]:
+ return []
+
+ def describe_app(self, app_arn: str) -> App:
+ if app_arn not in self.apps:
+ raise AppNotFound(app_arn)
+ return self.apps[app_arn]
+
+ @paginate(pagination_model=PAGINATION_MODEL)
+ def list_resiliency_policies(self, policy_name: str) -> List[Policy]:
+ if policy_name:
+ return [p for p in self.policies.values() if p.policy_name == policy_name]
+ return list(self.policies.values())
+
+ def describe_resiliency_policy(self, policy_arn: str) -> Policy:
+ if policy_arn not in self.policies:
+ raise ResiliencyPolicyNotFound(policy_arn)
+ return self.policies[policy_arn]
+
+ def tag_resource(self, resource_arn: str, tags: Dict[str, str]) -> None:
+ self.tagger.tag_resource(
+ resource_arn, TaggingService.convert_dict_to_tags_input(tags)
+ )
+
+ def untag_resource(self, resource_arn: str, tag_keys: List[str]) -> None:
+ self.tagger.untag_resource_using_names(resource_arn, tag_keys)
+
+ def list_tags_for_resource(self, resource_arn: str) -> Dict[str, str]:
+ return self.tagger.get_tag_dict_for_resource(resource_arn)
+
+
+resiliencehub_backends = BackendDict(ResilienceHubBackend, "resiliencehub")
diff --git a/moto/resiliencehub/responses.py b/moto/resiliencehub/responses.py
new file mode 100644
index 000000000..c9b530613
--- /dev/null
+++ b/moto/resiliencehub/responses.py
@@ -0,0 +1,159 @@
+import json
+from typing import Any
+from urllib.parse import unquote
+
+from moto.core.responses import BaseResponse
+
+from .exceptions import ValidationException
+from .models import ResilienceHubBackend, resiliencehub_backends
+
+
+class ResilienceHubResponse(BaseResponse):
+ def tags(self, request: Any, full_url: str, headers: Any) -> str: # type: ignore[return]
+ self.setup_class(request, full_url, headers)
+ if request.method == "GET":
+ return self.list_tags_for_resource()
+ if request.method == "POST":
+ return self.tag_resource()
+ if request.method == "DELETE":
+ return self.untag_resource()
+
+ def __init__(self) -> None:
+ super().__init__(service_name="resiliencehub")
+
+ @property
+ def resiliencehub_backend(self) -> ResilienceHubBackend:
+ return resiliencehub_backends[self.current_account][self.region]
+
+ def create_app(self) -> str:
+ params = json.loads(self.body)
+ assessment_schedule = params.get("assessmentSchedule")
+ description = params.get("description")
+ event_subscriptions = params.get("eventSubscriptions")
+ name = params.get("name")
+ permission_model = params.get("permissionModel")
+ policy_arn = params.get("policyArn")
+ tags = params.get("tags")
+ app = self.resiliencehub_backend.create_app(
+ assessment_schedule=assessment_schedule,
+ description=description,
+ event_subscriptions=event_subscriptions,
+ name=name,
+ permission_model=permission_model,
+ policy_arn=policy_arn,
+ tags=tags,
+ )
+ return json.dumps(dict(app=app.to_json()))
+
+ def create_resiliency_policy(self) -> str:
+ params = json.loads(self.body)
+ data_location_constraint = params.get("dataLocationConstraint")
+ policy = params.get("policy")
+ policy_description = params.get("policyDescription")
+ policy_name = params.get("policyName")
+ tags = params.get("tags")
+ tier = params.get("tier")
+
+ required_policy_types = ["Software", "Hardware", "AZ"]
+ all_policy_types = required_policy_types + ["Region"]
+ if any((p_type not in all_policy_types for p_type in policy.keys())):
+ raise ValidationException(
+ "1 validation error detected: Value at 'policy' failed to satisfy constraint: Map keys must satisfy constraint: [Member must satisfy enum value set: [Software, Hardware, Region, AZ]]"
+ )
+ for required_key in required_policy_types:
+ if required_key not in policy.keys():
+ raise ValidationException(
+ f"FailureType {required_key.upper()} does not exist"
+ )
+
+ policy = self.resiliencehub_backend.create_resiliency_policy(
+ data_location_constraint=data_location_constraint,
+ policy=policy,
+ policy_description=policy_description,
+ policy_name=policy_name,
+ tags=tags,
+ tier=tier,
+ )
+ return json.dumps(dict(policy=policy.to_json()))
+
+ def list_apps(self) -> str:
+ params = self._get_params()
+ app_arn = params.get("appArn")
+ max_results = int(params.get("maxResults", 100))
+ name = params.get("name")
+ next_token = params.get("nextToken")
+ reverse_order = params.get("reverseOrder") == "true"
+ app_summaries, next_token = self.resiliencehub_backend.list_apps(
+ app_arn=app_arn,
+ max_results=max_results,
+ name=name,
+ next_token=next_token,
+ reverse_order=reverse_order,
+ )
+ return json.dumps(
+ dict(
+ appSummaries=[a.to_json() for a in app_summaries], nextToken=next_token
+ )
+ )
+
+ def list_app_assessments(self) -> str:
+ summaries = self.resiliencehub_backend.list_app_assessments()
+ return json.dumps(dict(assessmentSummaries=summaries))
+
+ def describe_app(self) -> str:
+ params = json.loads(self.body)
+ app_arn = params.get("appArn")
+ app = self.resiliencehub_backend.describe_app(
+ app_arn=app_arn,
+ )
+ return json.dumps(dict(app=app.to_json()))
+
+ def list_resiliency_policies(self) -> str:
+ params = self._get_params()
+ max_results = int(params.get("maxResults", 100))
+ next_token = params.get("nextToken")
+ policy_name = params.get("policyName")
+ (
+ resiliency_policies,
+ next_token,
+ ) = self.resiliencehub_backend.list_resiliency_policies(
+ max_results=max_results,
+ next_token=next_token,
+ policy_name=policy_name,
+ )
+ policies = [p.to_json() for p in resiliency_policies]
+ return json.dumps(dict(nextToken=next_token, resiliencyPolicies=policies))
+
+ def describe_resiliency_policy(self) -> str:
+ params = json.loads(self.body)
+ policy_arn = params.get("policyArn")
+ policy = self.resiliencehub_backend.describe_resiliency_policy(
+ policy_arn=policy_arn,
+ )
+ return json.dumps(dict(policy=policy.to_json()))
+
+ def tag_resource(self) -> str:
+ params = json.loads(self.body)
+ resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1])
+ tags = params.get("tags")
+ self.resiliencehub_backend.tag_resource(
+ resource_arn=resource_arn,
+ tags=tags,
+ )
+ return "{}"
+
+ def untag_resource(self) -> str:
+ resource_arn = unquote(self.parsed_url.path.split("/tags/")[-1])
+ tag_keys = self.querystring.get("tagKeys", [])
+ self.resiliencehub_backend.untag_resource(
+ resource_arn=resource_arn,
+ tag_keys=tag_keys,
+ )
+ return "{}"
+
+ def list_tags_for_resource(self) -> str:
+ resource_arn = unquote(self.uri.split("/tags/")[-1])
+ tags = self.resiliencehub_backend.list_tags_for_resource(
+ resource_arn=resource_arn,
+ )
+ return json.dumps(dict(tags=tags))
diff --git a/moto/resiliencehub/urls.py b/moto/resiliencehub/urls.py
new file mode 100644
index 000000000..a39663688
--- /dev/null
+++ b/moto/resiliencehub/urls.py
@@ -0,0 +1,21 @@
+"""resiliencehub base URL and path."""
+from .responses import ResilienceHubResponse
+
+url_bases = [
+ r"https?://resiliencehub\.(.+)\.amazonaws\.com",
+]
+
+url_paths = {
+ "{0}/create-app$": ResilienceHubResponse.dispatch,
+ "{0}/create-resiliency-policy$": ResilienceHubResponse.dispatch,
+ "{0}/describe-app$": ResilienceHubResponse.dispatch,
+ "{0}/describe-resiliency-policy$": ResilienceHubResponse.dispatch,
+ "{0}/list-apps$": ResilienceHubResponse.dispatch,
+ "{0}/list-app-assessments$": ResilienceHubResponse.dispatch,
+ "{0}/list-resiliency-policies$": ResilienceHubResponse.dispatch,
+ "{0}/tags/.+$": ResilienceHubResponse.dispatch,
+ "{0}/tags/(?P[^/]+)/(?P[^/]+)$": ResilienceHubResponse.method_dispatch(
+ ResilienceHubResponse.tags # type: ignore
+ ),
+ "{0}/.*$": ResilienceHubResponse.dispatch,
+}
diff --git a/tests/test_resiliencehub/__init__.py b/tests/test_resiliencehub/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/test_resiliencehub/test_resiliencehub.py b/tests/test_resiliencehub/test_resiliencehub.py
new file mode 100644
index 000000000..5e470757c
--- /dev/null
+++ b/tests/test_resiliencehub/test_resiliencehub.py
@@ -0,0 +1,250 @@
+from uuid import uuid4
+
+import boto3
+import pytest
+from botocore.exceptions import ClientError
+
+from moto import mock_aws
+
+# See our Development Tips on writing tests for hints on how to write good tests:
+# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
+
+valid_resiliency_policy = {
+ "Software": {"rpoInSecs": 1, "rtoInSecs": 1},
+ "Hardware": {"rpoInSecs": 2, "rtoInSecs": 2},
+ "AZ": {"rpoInSecs": 3, "rtoInSecs": 3},
+}
+
+
+@mock_aws
+def test_create_app():
+ client = boto3.client("resiliencehub", region_name="us-east-2")
+ app = client.create_app(name="myapp")["app"]
+ assert app["assessmentSchedule"] == "Disabled"
+ assert app["complianceStatus"] == "NotAssessed"
+ assert app["creationTime"]
+ assert app["name"] == "myapp"
+ assert app["status"] == "Active"
+ assert app["tags"] == {}
+
+ app2 = client.describe_app(appArn=app["appArn"])["app"]
+ assert app == app2
+
+
+@mock_aws
+def test_create_app_advanced():
+ event_subs = [
+ {
+ "eventType": "ScheduledAssessmentFailure",
+ "name": "some event",
+ "snsTopicArn": "some sns arn",
+ }
+ ]
+ perm_model = {
+ "crossAccountRoleArns": ["arn1", "arn2"],
+ "invokerRoleName": "irn",
+ "type": "Rolebased",
+ }
+
+ client = boto3.client("resiliencehub", region_name="us-east-2")
+ app = client.create_app(
+ name="myapp",
+ assessmentSchedule="Daily",
+ description="some desc",
+ eventSubscriptions=event_subs,
+ permissionModel=perm_model,
+ policyArn="some policy arn",
+ )["app"]
+ assert app["assessmentSchedule"] == "Daily"
+ assert app["description"] == "some desc"
+ assert app["eventSubscriptions"] == event_subs
+ assert app["permissionModel"] == perm_model
+ assert app["policyArn"] == "some policy arn"
+
+
+@mock_aws
+def test_describe_unknown_app():
+ client = boto3.client("resiliencehub", region_name="us-east-1")
+ app_arn = f"arn:aws:resiliencehub:us-east-1:486285699788:app/{str(uuid4())}"
+ with pytest.raises(ClientError) as exc:
+ client.describe_app(appArn=app_arn)
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == f"App not found for appArn {app_arn}"
+
+
+@mock_aws
+def test_create_resilience_policy():
+ client = boto3.client("resiliencehub", region_name="us-east-1")
+
+ policy = client.create_resiliency_policy(
+ policy=valid_resiliency_policy, policyName="polname", tier="NonCritical"
+ )["policy"]
+ assert policy["creationTime"]
+ assert policy["policy"] == valid_resiliency_policy
+ assert policy["policyName"] == "polname"
+ assert policy["tags"] == {}
+ assert policy["tier"] == "NonCritical"
+
+ policy2 = client.describe_resiliency_policy(policyArn=policy["policyArn"])["policy"]
+ assert policy == policy2
+
+
+@mock_aws
+def test_create_resilience_policy_advanced():
+ client = boto3.client("resiliencehub", region_name="us-east-1")
+
+ policy = client.create_resiliency_policy(
+ dataLocationConstraint="AnyLocation",
+ policy=valid_resiliency_policy,
+ policyName="polname",
+ policyDescription="test policy",
+ tier="NonCritical",
+ )["policy"]
+ assert policy["dataLocationConstraint"] == "AnyLocation"
+ assert policy["policyDescription"] == "test policy"
+
+
+@mock_aws
+def test_create_resilience_policy_missing_types():
+ client = boto3.client("resiliencehub", region_name="us-east-1")
+
+ with pytest.raises(ClientError) as exc:
+ client.create_resiliency_policy(
+ policy={"Software": {"rpoInSecs": 1, "rtoInSecs": 1}},
+ policyName="polname",
+ tier="NonCritical",
+ )
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert err["Message"] == "FailureType HARDWARE does not exist"
+
+ with pytest.raises(ClientError) as exc:
+ client.create_resiliency_policy(
+ policy={
+ "Software": {"rpoInSecs": 1, "rtoInSecs": 1},
+ "Hardware": {"rpoInSecs": 2, "rtoInSecs": 2},
+ },
+ policyName="polname",
+ tier="NonCritical",
+ )
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert err["Message"] == "FailureType AZ does not exist"
+
+ with pytest.raises(ClientError) as exc:
+ client.create_resiliency_policy(
+ policy={"Hardware": {"rpoInSecs": 1, "rtoInSecs": 1}},
+ policyName="polname",
+ tier="NonCritical",
+ )
+ err = exc.value.response["Error"]
+ err["Message"] == "FailureType SOFTWARE does not exist"
+
+
+@mock_aws
+def test_create_resilience_policy_with_unknown_policy_type():
+ client = boto3.client("resiliencehub", region_name="us-east-1")
+ with pytest.raises(ClientError) as exc:
+ client.create_resiliency_policy(
+ policy={
+ "st": {"rpoInSecs": 1, "rtoInSecs": 1},
+ },
+ policyName="polname",
+ tier="NonCritical",
+ )
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ValidationException"
+ assert (
+ "Member must satisfy enum value set: [Software, Hardware, Region, AZ]"
+ in err["Message"]
+ )
+
+
+@mock_aws
+def test_list_apps():
+ client = boto3.client("resiliencehub", region_name="ap-southeast-1")
+ assert client.list_apps()["appSummaries"] == []
+
+ for i in range(5):
+ arn = client.create_app(name=f"app_{i}")["app"]["appArn"]
+
+ app_2 = client.list_apps(name="app_2")["appSummaries"][0]
+ assert app_2["name"] == "app_2"
+
+ app_4 = client.list_apps(appArn=arn)["appSummaries"][0]
+ assert app_4["name"] == "app_4"
+
+ all_apps = client.list_apps()["appSummaries"]
+ assert len(all_apps) == 5
+ assert [a["name"] for a in all_apps] == [
+ "app_0",
+ "app_1",
+ "app_2",
+ "app_3",
+ "app_4",
+ ]
+
+ all_apps = client.list_apps(reverseOrder=True)["appSummaries"]
+ assert len(all_apps) == 5
+ assert [a["name"] for a in all_apps] == [
+ "app_4",
+ "app_3",
+ "app_2",
+ "app_1",
+ "app_0",
+ ]
+
+ page1 = client.list_apps(maxResults=2)
+ assert len(page1["appSummaries"]) == 2
+
+ page2 = client.list_apps(maxResults=2, nextToken=page1["nextToken"])
+ assert len(page2["appSummaries"]) == 2
+
+ full_page = client.list_apps(nextToken=page1["nextToken"])
+ assert len(full_page["appSummaries"]) == 3
+
+
+@mock_aws
+def test_list_app_assessments():
+ client = boto3.client("resiliencehub", region_name="ap-southeast-1")
+ assert client.list_app_assessments()["assessmentSummaries"] == []
+
+
+@mock_aws
+def test_list_resiliency_policies():
+ client = boto3.client("resiliencehub", region_name="ap-southeast-1")
+ assert client.list_resiliency_policies()["resiliencyPolicies"] == []
+
+ for i in range(5):
+ client.create_resiliency_policy(
+ policy=valid_resiliency_policy, policyName=f"policy_{i}", tier="NonCritical"
+ )["policy"]
+
+ assert len(client.list_resiliency_policies()["resiliencyPolicies"]) == 5
+
+ policy2 = client.list_resiliency_policies(policyName="policy_2")[
+ "resiliencyPolicies"
+ ][0]
+ policy2["policyName"] == "policy_2"
+
+ page1 = client.list_resiliency_policies(maxResults=2)
+ assert len(page1["resiliencyPolicies"]) == 2
+
+ page2 = client.list_resiliency_policies(maxResults=2, nextToken=page1["nextToken"])
+ assert len(page2["resiliencyPolicies"]) == 2
+ assert page2["nextToken"]
+
+ page_full = client.list_resiliency_policies(nextToken=page1["nextToken"])
+ assert len(page_full["resiliencyPolicies"]) == 3
+ assert "nextToken" not in page_full
+
+
+@mock_aws
+def test_describe_unknown_resiliency_policy():
+ client = boto3.client("resiliencehub", region_name="eu-west-1")
+ with pytest.raises(ClientError) as exc:
+ client.describe_resiliency_policy(policyArn="unknownarn")
+ err = exc.value.response["Error"]
+ assert err["Code"] == "ResourceNotFoundException"
+ assert err["Message"] == "ResiliencyPolicy unknownarn not found"
diff --git a/tests/test_resiliencehub/test_resiliencyhub_tagging.py b/tests/test_resiliencehub/test_resiliencyhub_tagging.py
new file mode 100644
index 000000000..fd3e1258d
--- /dev/null
+++ b/tests/test_resiliencehub/test_resiliencyhub_tagging.py
@@ -0,0 +1,56 @@
+import boto3
+
+from moto import mock_aws
+
+from .test_resiliencehub import valid_resiliency_policy
+
+# See our Development Tips on writing tests for hints on how to write good tests:
+# http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html
+
+
+@mock_aws
+def test_app_tagging():
+ client = boto3.client("resiliencehub", region_name="us-east-2")
+ app = client.create_app(name="myapp", tags={"k": "v"})["app"]
+ arn = app["appArn"]
+ assert app["tags"] == {"k": "v"}
+
+ assert app == client.describe_app(appArn=arn)["app"]
+
+ client.tag_resource(resourceArn=arn, tags={"k2": "v2"})
+
+ assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {
+ "k": "v",
+ "k2": "v2",
+ }
+
+ client.untag_resource(resourceArn=arn, tagKeys=["k"])
+
+ assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k2": "v2"}
+
+
+@mock_aws
+def test_policy_tagging():
+ client = boto3.client("resiliencehub", region_name="us-east-2")
+ policy = client.create_resiliency_policy(
+ policy=valid_resiliency_policy,
+ policyName="polname",
+ tier="NonCritical",
+ tags={"k": "v"},
+ )["policy"]
+ arn = policy["policyArn"]
+
+ assert policy["tags"] == {"k": "v"}
+
+ assert policy == client.describe_resiliency_policy(policyArn=arn)["policy"]
+
+ client.tag_resource(resourceArn=arn, tags={"k2": "v2"})
+
+ assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {
+ "k": "v",
+ "k2": "v2",
+ }
+
+ client.untag_resource(resourceArn=arn, tagKeys=["k"])
+
+ assert client.list_tags_for_resource(resourceArn=arn)["tags"] == {"k2": "v2"}