From fd5d7c18c1df69b93b5e0d79a2b0e5be192ae982 Mon Sep 17 00:00:00 2001 From: Bert Blommers Date: Mon, 29 Jan 2024 20:44:06 +0000 Subject: [PATCH] SecretsManager: Replica Secrets are now supported (#7270) --- IMPLEMENTATION_COVERAGE.md | 6 +- docs/docs/services/secretsmanager.rst | 4 +- moto/secretsmanager/exceptions.py | 7 + moto/secretsmanager/models.py | 230 ++++++++++-- moto/secretsmanager/responses.py | 30 +- moto/ssm/models.py | 2 +- .../test_secretsmanager/test_list_secrets.py | 10 + .../test_secrets_duplication.py | 326 ++++++++++++++++++ 8 files changed, 572 insertions(+), 43 deletions(-) create mode 100644 tests/test_secretsmanager/test_secrets_duplication.py diff --git a/IMPLEMENTATION_COVERAGE.md b/IMPLEMENTATION_COVERAGE.md index 04faa5101..7b96cdf80 100644 --- a/IMPLEMENTATION_COVERAGE.md +++ b/IMPLEMENTATION_COVERAGE.md @@ -7016,7 +7016,7 @@ ## secretsmanager
-78% implemented +86% implemented - [ ] batch_get_secret_value - [X] cancel_rotate_secret @@ -7031,8 +7031,8 @@ - [X] list_secrets - [X] put_resource_policy - [X] put_secret_value -- [ ] remove_regions_from_replication -- [ ] replicate_secret_to_regions +- [X] remove_regions_from_replication +- [X] replicate_secret_to_regions - [X] restore_secret - [X] rotate_secret - [ ] stop_replication_to_replica diff --git a/docs/docs/services/secretsmanager.rst b/docs/docs/services/secretsmanager.rst index a3209a6ea..0fdaad7b5 100644 --- a/docs/docs/services/secretsmanager.rst +++ b/docs/docs/services/secretsmanager.rst @@ -31,8 +31,8 @@ secretsmanager - [X] put_secret_value -- [ ] remove_regions_from_replication -- [ ] replicate_secret_to_regions +- [X] remove_regions_from_replication +- [X] replicate_secret_to_regions - [X] restore_secret - [X] rotate_secret - [ ] stop_replication_to_replica diff --git a/moto/secretsmanager/exceptions.py b/moto/secretsmanager/exceptions.py index 9abd0775c..ac36fd9bf 100644 --- a/moto/secretsmanager/exceptions.py +++ b/moto/secretsmanager/exceptions.py @@ -62,3 +62,10 @@ class InvalidRequestException(SecretsManagerClientError): class ValidationException(SecretsManagerClientError): def __init__(self, message: str): super().__init__("ValidationException", message) + + +class OperationNotPermittedOnReplica(InvalidParameterException): + def __init__(self) -> None: + super().__init__( + "Operation not permitted on a replica secret. Call must be made in primary secret's region." + ) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index e18b3ef99..229576c9d 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -1,7 +1,7 @@ import datetime import json import time -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union from moto.core.base_backend import BackendDict, BaseBackend from moto.core.common_models import BaseModel @@ -12,6 +12,7 @@ from .exceptions import ( ClientError, InvalidParameterException, InvalidRequestException, + OperationNotPermittedOnReplica, ResourceExistsException, ResourceNotFoundException, SecretHasNoValueException, @@ -27,12 +28,24 @@ from .list_secrets.filters import ( ) from .utils import get_secret_name_from_partial_arn, random_password, secret_arn + +def filter_primary_region(secret: "FakeSecret", values: List[str]) -> bool: + if isinstance(secret, FakeSecret): + return len(secret.replicas) > 0 and secret.region in values + elif isinstance(secret, ReplicaSecret): + return secret.source.region in values + + _filter_functions = { "all": filter_all, "name": name_filter, "description": description_filter, "tag-key": tag_key, "tag-value": tag_value, + "primary-region": filter_primary_region, + # Other services do not create secrets in Moto (yet) + # So if you're looking for any Secrets owned by a Service, you'll never get any results + "owning-service": lambda x, y: False, } @@ -40,7 +53,9 @@ def filter_keys() -> List[str]: return list(_filter_functions.keys()) -def _matches(secret: "FakeSecret", filters: List[Dict[str, Any]]) -> bool: +def _matches( + secret: Union["FakeSecret", "ReplicaSecret"], filters: List[Dict[str, Any]] +) -> bool: is_match = True for f in filters: @@ -72,10 +87,14 @@ class FakeSecret: version_stages: Optional[List[str]] = None, last_changed_date: Optional[int] = None, created_date: Optional[int] = None, + replica_regions: Optional[List[Dict[str, str]]] = None, + force_overwrite: bool = False, ): self.secret_id = secret_id self.name = secret_id self.arn = secret_arn(account_id, region_name, secret_id) + self.account_id = account_id + self.region = region_name self.secret_string = secret_string self.secret_binary = secret_binary self.description = description @@ -101,6 +120,36 @@ class FakeSecret: else: self.set_default_version_id(None) + self.replicas = self.create_replicas( + replica_regions or [], force_overwrite=force_overwrite + ) + + def create_replicas( + self, replica_regions: List[Dict[str, str]], force_overwrite: bool + ) -> List["ReplicaSecret"]: + # Validate first, before we create anything + for replica_config in replica_regions or []: + if replica_config["Region"] == self.region: + raise InvalidParameterException("Invalid replica region.") + + replicas: List[ReplicaSecret] = [] + for replica_config in replica_regions or []: + replica_region = replica_config["Region"] + backend = secretsmanager_backends[self.account_id][replica_region] + if self.name in backend.secrets: + if force_overwrite: + backend.secrets.pop(self.name) + replica = ReplicaSecret(self, replica_region) + backend.secrets[replica.arn] = replica + else: + message = f"Replication failed: Secret name simple already exists in region {backend.region_name}." + replica = ReplicaSecret(self, replica_region, "Failed", message) + else: + replica = ReplicaSecret(self, replica_region) + backend.secrets[replica.arn] = replica + replicas.append(replica) + return replicas + def update( self, description: Optional[str] = None, @@ -162,7 +211,7 @@ class FakeSecret: ) -> str: if not version_id: version_id = self.default_version_id - dct = { + dct: Dict[str, Any] = { "ARN": self.arn, "Name": self.name, } @@ -170,6 +219,8 @@ class FakeSecret: dct["VersionId"] = version_id if version_id and include_version_stages: dct["VersionStages"] = self.versions[version_id]["version_stages"] + if self.replicas: + dct["ReplicationStatus"] = [replica.config for replica in self.replicas] return json.dumps(dct) def to_dict(self) -> Dict[str, Any]: @@ -209,6 +260,8 @@ class FakeSecret: "LastRotatedDate": self.last_rotation_date, } ) + if self.replicas: + dct["ReplicationStatus"] = [replica.config for replica in self.replicas] return dct def _form_version_ids_to_stages(self) -> Dict[str, str]: @@ -219,15 +272,62 @@ class FakeSecret: return version_id_to_stages -class SecretsStore(Dict[str, FakeSecret]): +class ReplicaSecret: + def __init__( + self, + source: FakeSecret, + region: str, + status: Optional[str] = None, + message: Optional[str] = None, + ): + self.source = source + self.arn = source.arn.replace(source.region, region) + self.region = region + self.status = status or "InSync" + self.message = message or "Replication succeeded" + self.has_replica = status is None + self.config = { + "Region": self.region, + "KmsKeyId": "alias/aws/secretsmanager", + "Status": self.status, + "StatusMessage": self.message, + } + + def is_deleted(self) -> bool: + return False + + def to_dict(self) -> Dict[str, Any]: + dct = self.source.to_dict() + dct["ARN"] = self.arn + dct["PrimaryRegion"] = self.source.region + return dct + + @property + def default_version_id(self) -> Optional[str]: + return self.source.default_version_id + + @property + def versions(self) -> Dict[str, Dict[str, Any]]: # type: ignore[misc] + return self.source.versions + + @property + def name(self) -> str: + return self.source.name + + @property + def secret_id(self) -> str: + return self.source.secret_id + + +class SecretsStore(Dict[str, Union[FakeSecret, ReplicaSecret]]): # Parameters to this dictionary can be three possible values: # names, full ARNs, and partial ARNs # Every retrieval method should check which type of input it receives - def __setitem__(self, key: str, value: FakeSecret) -> None: + def __setitem__(self, key: str, value: Union[FakeSecret, ReplicaSecret]) -> None: super().__setitem__(key, value) - def __getitem__(self, key: str) -> FakeSecret: + def __getitem__(self, key: str) -> Union[FakeSecret, ReplicaSecret]: for secret in dict.values(self): if secret.arn == key or secret.name == key: return secret @@ -241,14 +341,14 @@ class SecretsStore(Dict[str, FakeSecret]): name = get_secret_name_from_partial_arn(key) return dict.__contains__(self, name) # type: ignore - def get(self, key: str) -> Optional[FakeSecret]: # type: ignore + def get(self, key: str) -> Optional[Union[FakeSecret, ReplicaSecret]]: # type: ignore for secret in dict.values(self): if secret.arn == key or secret.name == key: return secret name = get_secret_name_from_partial_arn(key) return super().get(name) - def pop(self, key: str) -> Optional[FakeSecret]: # type: ignore + def pop(self, key: str) -> Optional[Union[FakeSecret, ReplicaSecret]]: # type: ignore for secret in dict.values(self): if secret.arn == key or secret.name == key: key = secret.name @@ -295,6 +395,8 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica if secret.is_deleted(): raise InvalidRequestException( "You tried to perform the operation on a secret that's currently marked deleted." @@ -391,13 +493,16 @@ class SecretsManagerBackend(BaseBackend): if secret_id not in self.secrets: raise SecretNotFoundException() - if self.secrets[secret_id].is_deleted(): + secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica + + if secret.is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: " "You can't perform this operation on the secret because it was marked for deletion." ) - secret = self.secrets[secret_id] tags = secret.tags description = description or secret.description @@ -416,15 +521,15 @@ class SecretsManagerBackend(BaseBackend): def create_secret( self, name: str, - secret_string: Optional[str] = None, - secret_binary: Optional[str] = None, - description: Optional[str] = None, - tags: Optional[List[Dict[str, str]]] = None, - kms_key_id: Optional[str] = None, - client_request_token: Optional[str] = None, + secret_string: Optional[str], + secret_binary: Optional[str], + description: Optional[str], + tags: Optional[List[Dict[str, str]]], + kms_key_id: Optional[str], + client_request_token: Optional[str], + replica_regions: List[Dict[str, str]], + force_overwrite: bool, ) -> str: - - # error if secret exists if name in self.secrets.keys(): raise ResourceExistsException( "A resource with the ID you requested already exists." @@ -438,6 +543,8 @@ class SecretsManagerBackend(BaseBackend): tags=tags, kms_key_id=kms_key_id, version_id=client_request_token, + replica_regions=replica_regions, + force_overwrite=force_overwrite, ) return secret.to_short_dict(include_version_id=new_version) @@ -452,6 +559,8 @@ class SecretsManagerBackend(BaseBackend): kms_key_id: Optional[str] = None, version_id: Optional[str] = None, version_stages: Optional[List[str]] = None, + replica_regions: Optional[List[Dict[str, str]]] = None, + force_overwrite: bool = False, ) -> Tuple[FakeSecret, bool]: if version_stages is None: @@ -475,6 +584,8 @@ class SecretsManagerBackend(BaseBackend): update_time = int(time.time()) if secret_id in self.secrets: secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica secret.update(description, tags, kms_key_id, last_changed_date=update_time) @@ -498,6 +609,8 @@ class SecretsManagerBackend(BaseBackend): created_date=update_time, version_id=version_id, secret_version=secret_version, + replica_regions=replica_regions, + force_overwrite=force_overwrite, ) self.secrets[secret_id] = secret @@ -516,6 +629,8 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() else: secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica tags = secret.tags description = secret.description @@ -533,13 +648,11 @@ class SecretsManagerBackend(BaseBackend): return secret.to_short_dict(include_version_stages=True, version_id=version_id) - def describe_secret(self, secret_id: str) -> Dict[str, Any]: + def describe_secret(self, secret_id: str) -> Union[FakeSecret, ReplicaSecret]: if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() - secret = self.secrets[secret_id] - - return secret.to_dict() + return self.secrets[secret_id] def rotate_secret( self, @@ -553,7 +666,10 @@ class SecretsManagerBackend(BaseBackend): if not self._is_valid_identifier(secret_id): raise SecretNotFoundException() - if self.secrets[secret_id].is_deleted(): + secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica + if secret.is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \ perform the operation on a secret that's currently marked deleted." @@ -568,17 +684,13 @@ class SecretsManagerBackend(BaseBackend): if rotation_days in rotation_rules: rotation_period = rotation_rules[rotation_days] if rotation_period < 1 or rotation_period > 1000: - msg = ( - "RotationRules.AutomaticallyAfterDays " "must be within 1-1000." - ) + msg = "RotationRules.AutomaticallyAfterDays must be within 1-1000." raise InvalidParameterException(msg) - self.secrets[secret_id].next_rotation_date = int(time.time()) + ( + secret.next_rotation_date = int(time.time()) + ( int(rotation_period) * 86400 ) - secret = self.secrets[secret_id] - # The rotation function must end with the versions of the secret in # one of two states: # @@ -674,7 +786,7 @@ class SecretsManagerBackend(BaseBackend): ) secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"] - self.secrets[secret_id].last_rotation_date = int(time.time()) + secret.last_rotation_date = int(time.time()) return secret.to_short_dict() def get_random_password( @@ -787,7 +899,15 @@ class SecretsManagerBackend(BaseBackend): deletion_date = utcnow() return arn, name, self._unix_time_secs(deletion_date) else: - if self.secrets[secret_id].is_deleted(): + secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica + if len(secret.replicas) > 0: + replica_regions = ",".join([rep.region for rep in secret.replicas]) + msg = f"You can't delete secret {secret_id} that still has replica regions [{replica_regions}]" + raise InvalidParameterException(msg) + + if secret.is_deleted(): raise InvalidRequestException( "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \ perform the operation on a secret that's currently marked deleted." @@ -796,11 +916,10 @@ class SecretsManagerBackend(BaseBackend): deletion_date = utcnow() if force_delete_without_recovery: - secret = self.secrets.pop(secret_id) + self.secrets.pop(secret_id) else: deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) - self.secrets[secret_id].delete(self._unix_time_secs(deletion_date)) - secret = self.secrets.get(secret_id) + secret.delete(self._unix_time_secs(deletion_date)) if not secret: raise SecretNotFoundException() @@ -816,6 +935,8 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica secret.restore() return secret.arn, secret.name @@ -826,6 +947,8 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica old_tags = secret.tags for tag in tags: @@ -847,6 +970,8 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica tags = secret.tags for tag in tags: @@ -912,6 +1037,8 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica secret.policy = policy return secret.arn, secret.name @@ -920,6 +1047,8 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica resp = { "ARN": secret.arn, "Name": secret.name, @@ -933,8 +1062,41 @@ class SecretsManagerBackend(BaseBackend): raise SecretNotFoundException() secret = self.secrets[secret_id] + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica secret.policy = None return secret.arn, secret.name + def replicate_secret_to_regions( + self, + secret_id: str, + replica_regions: List[Dict[str, str]], + force_overwrite: bool, + ) -> Tuple[str, List[Dict[str, Any]]]: + secret = self.describe_secret(secret_id) + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica + secret.replicas.extend( + secret.create_replicas(replica_regions, force_overwrite=force_overwrite) + ) + statuses = [replica.config for replica in secret.replicas] + return secret_id, statuses + + def remove_regions_from_replication( + self, secret_id: str, replica_regions: List[str] + ) -> Tuple[str, List[Dict[str, str]]]: + secret = self.describe_secret(secret_id) + if isinstance(secret, ReplicaSecret): + raise OperationNotPermittedOnReplica + for replica in secret.replicas.copy(): + if replica.region in replica_regions: + backend = secretsmanager_backends[self.account_id][replica.region] + if replica.has_replica: + dict.pop(backend.secrets, replica.arn) + secret.replicas.remove(replica) + + statuses = [replica.config for replica in secret.replicas] + return secret_id, statuses + secretsmanager_backends = BackendDict(SecretsManagerBackend, "secretsmanager") diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index da38b2814..d4cd60116 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -55,8 +55,11 @@ class SecretsManagerResponse(BaseResponse): secret_binary = self._get_param("SecretBinary") description = self._get_param("Description", if_none="") tags = self._get_param("Tags", if_none=[]) - kms_key_id = self._get_param("KmsKeyId", if_none=None) - client_request_token = self._get_param("ClientRequestToken", if_none=None) + kms_key_id = self._get_param("KmsKeyId") + client_request_token = self._get_param("ClientRequestToken") + replica_regions = self._get_param("AddReplicaRegions", []) + force_overwrite = self._get_bool_param("ForceOverwriteReplicaSecret", False) + return self.backend.create_secret( name=name, secret_string=secret_string, @@ -65,6 +68,8 @@ class SecretsManagerResponse(BaseResponse): tags=tags, kms_key_id=kms_key_id, client_request_token=client_request_token, + replica_regions=replica_regions, + force_overwrite=force_overwrite, ) def update_secret(self) -> str: @@ -108,7 +113,7 @@ class SecretsManagerResponse(BaseResponse): def describe_secret(self) -> str: secret_id = self._get_param("SecretId") secret = self.backend.describe_secret(secret_id=secret_id) - return json.dumps(secret) + return json.dumps(secret.to_dict()) def rotate_secret(self) -> str: client_request_token = self._get_param("ClientRequestToken") @@ -212,3 +217,22 @@ class SecretsManagerResponse(BaseResponse): move_to_version_id=move_to_version_id, ) return json.dumps({"ARN": arn, "Name": name}) + + def replicate_secret_to_regions(self) -> str: + secret_id = self._get_param("SecretId") + replica_regions = self._get_param("AddReplicaRegions", []) + force_overwrite = self._get_bool_param("ForceOverwriteReplicaSecret", False) + + arn, statuses = self.backend.replicate_secret_to_regions( + secret_id, replica_regions, force_overwrite=force_overwrite + ) + return json.dumps({"ARN": arn, "ReplicationStatus": statuses}) + + def remove_regions_from_replication(self) -> str: + secret_id = self._get_param("SecretId") + replica_regions = self._get_param("RemoveReplicaRegions", []) + + arn, statuses = self.backend.remove_regions_from_replication( + secret_id, replica_regions + ) + return json.dumps({"ARN": arn, "ReplicationStatus": statuses}) diff --git a/moto/ssm/models.py b/moto/ssm/models.py index 3e8cbbad6..b3e92af19 100644 --- a/moto/ssm/models.py +++ b/moto/ssm/models.py @@ -132,7 +132,7 @@ class ParameterDict(DefaultDict[str, List["Parameter"]]): def _get_secretsmanager_parameter(self, secret_name: str) -> List["Parameter"]: secrets_backend = secretsmanager_backends[self.account_id][self.region_name] - secret = secrets_backend.describe_secret(secret_name) + secret = secrets_backend.describe_secret(secret_name).to_dict() version_id_to_stage = secret["VersionIdsToStages"] # Sort version ID's so that AWSCURRENT is last sorted_version_ids = [ diff --git a/tests/test_secretsmanager/test_list_secrets.py b/tests/test_secretsmanager/test_list_secrets.py index 3efcfe53e..32dadb7dc 100644 --- a/tests/test_secretsmanager/test_list_secrets.py +++ b/tests/test_secretsmanager/test_list_secrets.py @@ -269,3 +269,13 @@ def test_with_filter_with_negation(): secret_names = list(map(lambda s: s["Name"], secrets["SecretList"])) assert secret_names == ["baz"] + + +@mock_aws +def test_filter_with_owning_service(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + conn.create_secret(Name="foo", SecretString="secret") + + resp = conn.list_secrets(Filters=[{"Key": "owning-service", "Values": ["n/a"]}]) + assert resp["SecretList"] == [] diff --git a/tests/test_secretsmanager/test_secrets_duplication.py b/tests/test_secretsmanager/test_secrets_duplication.py new file mode 100644 index 000000000..1f7b1988b --- /dev/null +++ b/tests/test_secretsmanager/test_secrets_duplication.py @@ -0,0 +1,326 @@ +import boto3 +import pytest +from botocore.exceptions import ClientError + +from moto import mock_aws + + +@mock_aws +def test_filter_with_primary_region(): + conn = boto3.client("secretsmanager", region_name="us-east-1") + + conn.create_secret(Name="simple", SecretString="secret") + conn.create_secret( + Name="dup", SecretString="s", AddReplicaRegions=[{"Region": "us-east-2"}] + ) + + secrets = conn.list_secrets( + Filters=[{"Key": "primary-region", "Values": ["us-east-1"]}] + )["SecretList"] + assert len(secrets) == 1 + + secrets = conn.list_secrets( + Filters=[{"Key": "primary-region", "Values": ["us-east-2"]}] + )["SecretList"] + assert len(secrets) == 0 + + secrets = conn.list_secrets()["SecretList"] + assert len(secrets) == 2 + + # If our entry point is us-east-2, but filter in us-east-1 + # We can see the replicas in us-east-2 that have Primary Secrets in us-east-1 + use2 = boto3.client("secretsmanager", region_name="us-east-2") + secrets = use2.list_secrets( + Filters=[{"Key": "primary-region", "Values": ["us-east-1"]}] + )["SecretList"] + assert len(secrets) == 1 + assert secrets[0]["PrimaryRegion"] == "us-east-1" + + +@mock_aws +def test_create_secret_that_duplicates(): + use1 = boto3.client("secretsmanager", region_name="us-east-1") + use2 = boto3.client("secretsmanager", region_name="us-east-2") + + resp = use1.create_secret( + Name="dup", SecretString="s", AddReplicaRegions=[{"Region": "us-east-2"}] + ) + primary_arn = resp["ARN"] + assert resp["ReplicationStatus"][0]["Region"] == "us-east-2" + assert resp["ReplicationStatus"][0]["Status"] == "InSync" + + primary_secret = use1.describe_secret(SecretId=primary_arn) + assert primary_secret["ARN"] == primary_arn + + replica_arn = primary_arn.replace("us-east-1", "us-east-2") + replica_secret = use2.describe_secret(SecretId=replica_arn) + assert replica_secret["ARN"] == replica_arn + assert replica_secret["Name"] == "dup" + assert replica_secret["VersionIdsToStages"] == primary_secret["VersionIdsToStages"] + + replica_value = use2.get_secret_value(SecretId=replica_arn) + assert replica_value["Name"] == "dup" + assert replica_value["SecretString"] == "s" + + +@mock_aws +def test_create_secret_that_duplicates__in_same_region(): + conn = boto3.client("secretsmanager", region_name="us-east-1") + + with pytest.raises(ClientError) as exc: + conn.create_secret( + Name="dup_same_region", + SecretString="s", + AddReplicaRegions=[{"Region": "us-east-1"}], + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == "Invalid replica region." + + +@mock_aws +def test_replicate_secret(): + us1 = boto3.client("secretsmanager", region_name="us-east-1") + us2 = boto3.client("secretsmanager", region_name="us-east-2") + + resp = us1.create_secret(Name="dup", SecretString="s") + primary_arn = resp["ARN"] + + resp = us1.replicate_secret_to_regions( + SecretId=primary_arn, AddReplicaRegions=[{"Region": "us-east-2"}] + ) + assert resp["ReplicationStatus"][0]["Region"] == "us-east-2" + assert resp["ReplicationStatus"][0]["Status"] == "InSync" + + replica_arn = primary_arn.replace("us-east-1", "us-east-2") + replica_secret = us2.describe_secret(SecretId=replica_arn) + assert replica_secret["ARN"] == replica_arn + + +@mock_aws +def test_replicate_secret__in_same_region(): + conn = boto3.client("secretsmanager", region_name="us-east-1") + + resp = conn.create_secret(Name="dup", SecretString="s") + with pytest.raises(ClientError) as exc: + conn.replicate_secret_to_regions( + SecretId=resp["ARN"], AddReplicaRegions=[{"Region": "us-east-1"}] + ) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == "Invalid replica region." + + +@mock_aws +def test_replicate_secret__existing_secret(): + use1 = boto3.client("secretsmanager", region_name="us-east-1") + use2 = boto3.client("secretsmanager", region_name="us-east-2") + + secret1 = use1.create_secret(Name="dup", SecretString="s1")["ARN"] + secret2 = use2.create_secret(Name="dup", SecretString="s2")["ARN"] + + resp = use1.replicate_secret_to_regions( + SecretId=secret1, AddReplicaRegions=[{"Region": "us-east-2"}] + ) + assert resp["ReplicationStatus"][0]["Region"] == "us-east-2" + assert resp["ReplicationStatus"][0]["Status"] == "Failed" + assert ( + resp["ReplicationStatus"][0]["StatusMessage"] + == "Replication failed: Secret name simple already exists in region us-east-2." + ) + + # us-east-2 still only has one secret + assert len(use2.list_secrets()["SecretList"]) == 1 + + # Both secrets still have original values + assert use1.get_secret_value(SecretId=secret1)["SecretString"] == "s1" + assert use2.get_secret_value(SecretId=secret2)["SecretString"] == "s2" + + +@mock_aws +def test_replicate_secret__overwrite_existing_secret(): + use1 = boto3.client("secretsmanager", region_name="us-east-1") + use2 = boto3.client("secretsmanager", region_name="us-east-2") + + secret1 = use1.create_secret(Name="dup", SecretString="s1")["ARN"] + secret2 = use2.create_secret(Name="dup", SecretString="s2")["ARN"] + + resp = use1.replicate_secret_to_regions( + SecretId=secret1, + AddReplicaRegions=[{"Region": "us-east-2"}], + ForceOverwriteReplicaSecret=True, + ) + assert resp["ReplicationStatus"][0]["Status"] == "InSync" + + # us-east-2 still only has one secret + assert len(use2.list_secrets()["SecretList"]) == 1 + + # Original Secret was yeeted + with pytest.raises(ClientError): + assert use2.get_secret_value(SecretId=secret2)["SecretString"] == "s1" + + # Use ListSecrets to get the new ARN + # And verify it returns a copy of the us-east-1 secret + secret2 = use2.list_secrets()["SecretList"][0]["ARN"] + assert use2.get_secret_value(SecretId=secret2)["SecretString"] == "s1" + + +@mock_aws +def test_remove_regions_from_replication(): + use1 = boto3.client("secretsmanager", region_name="us-east-1") + use2 = boto3.client("secretsmanager", region_name="us-east-2") + usw2 = boto3.client("secretsmanager", region_name="us-west-2") + + arn1 = use1.create_secret(Name="dup", SecretString="s1")["ARN"] + arn2 = arn1.replace("us-east-1", "us-east-2") + arn3 = arn1.replace("us-east-1", "us-west-2") + + # Replicate to multiple regions + use1.replicate_secret_to_regions( + SecretId=arn1, + AddReplicaRegions=[{"Region": "us-east-2"}, {"Region": "us-west-2"}], + ) + + # Replications exist + use2.describe_secret(SecretId=arn2) + usw2.describe_secret(SecretId=arn3) + + # Primary Secret knows about replicas + replications = use1.describe_secret(SecretId=arn1)["ReplicationStatus"] + assert set([rep["Region"] for rep in replications]) == {"us-east-2", "us-west-2"} + + # Remove us-east-2 replication + use1.remove_regions_from_replication( + SecretId=arn1, RemoveReplicaRegions=["us-east-2"] + ) + with pytest.raises(ClientError): + use2.describe_secret(SecretId=arn2) + + # us-west still exists + usw2.describe_secret(SecretId=arn3) + + # Primary Secret no longer knows about us-east-2 + replications = use1.describe_secret(SecretId=arn1)["ReplicationStatus"] + assert set([rep["Region"] for rep in replications]) == {"us-west-2"} + + # Removing region is idempotent - invoking it again does not result in any errors + use1.remove_regions_from_replication( + SecretId=arn1, RemoveReplicaRegions=["us-east-2"] + ) + + +@mock_aws +def test_update_replica(): + use1 = boto3.client("secretsmanager", region_name="us-east-1") + use2 = boto3.client("secretsmanager", region_name="us-east-2") + + arn1 = use1.create_secret( + Name="dup", SecretString="s1", AddReplicaRegions=[{"Region": "us-east-2"}] + )["ARN"] + arn2 = arn1.replace("us-east-1", "us-east-2") + + OP_NOT_PERMITTED = "Operation not permitted on a replica secret. Call must be made in primary secret's region." + + with pytest.raises(ClientError) as exc: + use2.update_secret(SecretId=arn2, SecretString="asdf") + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.put_secret_value(SecretId=arn2, SecretString="asdf") + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.tag_resource(SecretId=arn2, Tags=[{"Key": "k", "Value": "v"}]) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.untag_resource(SecretId=arn2, TagKeys=["k1"]) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.put_resource_policy(SecretId=arn2, ResourcePolicy="k1") + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.get_resource_policy(SecretId=arn2) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.delete_resource_policy(SecretId=arn2) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.delete_secret(SecretId=arn2) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.cancel_rotate_secret(SecretId=arn2) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.rotate_secret(SecretId=arn2) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.restore_secret(SecretId=arn2) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.replicate_secret_to_regions(SecretId=arn2, AddReplicaRegions=[{}]) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + with pytest.raises(ClientError) as exc: + use2.remove_regions_from_replication(SecretId=arn2, RemoveReplicaRegions=["a"]) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert err["Message"] == OP_NOT_PERMITTED + + +@mock_aws +def test_delete_while_duplication_exist(): + use1 = boto3.client("secretsmanager", region_name="us-east-1") + + region = "us-east-2" + arn = use1.create_secret( + Name="dup", SecretString="s1", AddReplicaRegions=[{"Region": region}] + )["ARN"] + + # unable to delete + with pytest.raises(ClientError) as exc: + use1.delete_secret(SecretId=arn) + err = exc.value.response["Error"] + assert err["Code"] == "InvalidParameterException" + assert ( + err["Message"] + == f"You can't delete secret {arn} that still has replica regions [{region}]" + ) + + # Remove us-east-2 replication + use1.remove_regions_from_replication(SecretId=arn, RemoveReplicaRegions=[region]) + + # Now we can delete + use1.delete_secret(SecretId=arn)