SecretsManager: Replica Secrets are now supported (#7270)

This commit is contained in:
Bert Blommers 2024-01-29 20:44:06 +00:00 committed by GitHub
parent 496c1cd559
commit fd5d7c18c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 572 additions and 43 deletions

View File

@ -7016,7 +7016,7 @@
## secretsmanager
<details>
<summary>78% implemented</summary>
<summary>86% implemented</summary>
- [ ] batch_get_secret_value
- [X] cancel_rotate_secret
@ -7031,8 +7031,8 @@
- [X] list_secrets
- [X] put_resource_policy
- [X] put_secret_value
- [ ] remove_regions_from_replication
- [ ] replicate_secret_to_regions
- [X] remove_regions_from_replication
- [X] replicate_secret_to_regions
- [X] restore_secret
- [X] rotate_secret
- [ ] stop_replication_to_replica

View File

@ -31,8 +31,8 @@ secretsmanager
- [X] put_secret_value
- [ ] remove_regions_from_replication
- [ ] replicate_secret_to_regions
- [X] remove_regions_from_replication
- [X] replicate_secret_to_regions
- [X] restore_secret
- [X] rotate_secret
- [ ] stop_replication_to_replica

View File

@ -62,3 +62,10 @@ class InvalidRequestException(SecretsManagerClientError):
class ValidationException(SecretsManagerClientError):
def __init__(self, message: str):
super().__init__("ValidationException", message)
class OperationNotPermittedOnReplica(InvalidParameterException):
def __init__(self) -> None:
super().__init__(
"Operation not permitted on a replica secret. Call must be made in primary secret's region."
)

View File

@ -1,7 +1,7 @@
import datetime
import json
import time
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union
from moto.core.base_backend import BackendDict, BaseBackend
from moto.core.common_models import BaseModel
@ -12,6 +12,7 @@ from .exceptions import (
ClientError,
InvalidParameterException,
InvalidRequestException,
OperationNotPermittedOnReplica,
ResourceExistsException,
ResourceNotFoundException,
SecretHasNoValueException,
@ -27,12 +28,24 @@ from .list_secrets.filters import (
)
from .utils import get_secret_name_from_partial_arn, random_password, secret_arn
def filter_primary_region(secret: "FakeSecret", values: List[str]) -> bool:
if isinstance(secret, FakeSecret):
return len(secret.replicas) > 0 and secret.region in values
elif isinstance(secret, ReplicaSecret):
return secret.source.region in values
_filter_functions = {
"all": filter_all,
"name": name_filter,
"description": description_filter,
"tag-key": tag_key,
"tag-value": tag_value,
"primary-region": filter_primary_region,
# Other services do not create secrets in Moto (yet)
# So if you're looking for any Secrets owned by a Service, you'll never get any results
"owning-service": lambda x, y: False,
}
@ -40,7 +53,9 @@ def filter_keys() -> List[str]:
return list(_filter_functions.keys())
def _matches(secret: "FakeSecret", filters: List[Dict[str, Any]]) -> bool:
def _matches(
secret: Union["FakeSecret", "ReplicaSecret"], filters: List[Dict[str, Any]]
) -> bool:
is_match = True
for f in filters:
@ -72,10 +87,14 @@ class FakeSecret:
version_stages: Optional[List[str]] = None,
last_changed_date: Optional[int] = None,
created_date: Optional[int] = None,
replica_regions: Optional[List[Dict[str, str]]] = None,
force_overwrite: bool = False,
):
self.secret_id = secret_id
self.name = secret_id
self.arn = secret_arn(account_id, region_name, secret_id)
self.account_id = account_id
self.region = region_name
self.secret_string = secret_string
self.secret_binary = secret_binary
self.description = description
@ -101,6 +120,36 @@ class FakeSecret:
else:
self.set_default_version_id(None)
self.replicas = self.create_replicas(
replica_regions or [], force_overwrite=force_overwrite
)
def create_replicas(
self, replica_regions: List[Dict[str, str]], force_overwrite: bool
) -> List["ReplicaSecret"]:
# Validate first, before we create anything
for replica_config in replica_regions or []:
if replica_config["Region"] == self.region:
raise InvalidParameterException("Invalid replica region.")
replicas: List[ReplicaSecret] = []
for replica_config in replica_regions or []:
replica_region = replica_config["Region"]
backend = secretsmanager_backends[self.account_id][replica_region]
if self.name in backend.secrets:
if force_overwrite:
backend.secrets.pop(self.name)
replica = ReplicaSecret(self, replica_region)
backend.secrets[replica.arn] = replica
else:
message = f"Replication failed: Secret name simple already exists in region {backend.region_name}."
replica = ReplicaSecret(self, replica_region, "Failed", message)
else:
replica = ReplicaSecret(self, replica_region)
backend.secrets[replica.arn] = replica
replicas.append(replica)
return replicas
def update(
self,
description: Optional[str] = None,
@ -162,7 +211,7 @@ class FakeSecret:
) -> str:
if not version_id:
version_id = self.default_version_id
dct = {
dct: Dict[str, Any] = {
"ARN": self.arn,
"Name": self.name,
}
@ -170,6 +219,8 @@ class FakeSecret:
dct["VersionId"] = version_id
if version_id and include_version_stages:
dct["VersionStages"] = self.versions[version_id]["version_stages"]
if self.replicas:
dct["ReplicationStatus"] = [replica.config for replica in self.replicas]
return json.dumps(dct)
def to_dict(self) -> Dict[str, Any]:
@ -209,6 +260,8 @@ class FakeSecret:
"LastRotatedDate": self.last_rotation_date,
}
)
if self.replicas:
dct["ReplicationStatus"] = [replica.config for replica in self.replicas]
return dct
def _form_version_ids_to_stages(self) -> Dict[str, str]:
@ -219,15 +272,62 @@ class FakeSecret:
return version_id_to_stages
class SecretsStore(Dict[str, FakeSecret]):
class ReplicaSecret:
def __init__(
self,
source: FakeSecret,
region: str,
status: Optional[str] = None,
message: Optional[str] = None,
):
self.source = source
self.arn = source.arn.replace(source.region, region)
self.region = region
self.status = status or "InSync"
self.message = message or "Replication succeeded"
self.has_replica = status is None
self.config = {
"Region": self.region,
"KmsKeyId": "alias/aws/secretsmanager",
"Status": self.status,
"StatusMessage": self.message,
}
def is_deleted(self) -> bool:
return False
def to_dict(self) -> Dict[str, Any]:
dct = self.source.to_dict()
dct["ARN"] = self.arn
dct["PrimaryRegion"] = self.source.region
return dct
@property
def default_version_id(self) -> Optional[str]:
return self.source.default_version_id
@property
def versions(self) -> Dict[str, Dict[str, Any]]: # type: ignore[misc]
return self.source.versions
@property
def name(self) -> str:
return self.source.name
@property
def secret_id(self) -> str:
return self.source.secret_id
class SecretsStore(Dict[str, Union[FakeSecret, ReplicaSecret]]):
# Parameters to this dictionary can be three possible values:
# names, full ARNs, and partial ARNs
# Every retrieval method should check which type of input it receives
def __setitem__(self, key: str, value: FakeSecret) -> None:
def __setitem__(self, key: str, value: Union[FakeSecret, ReplicaSecret]) -> None:
super().__setitem__(key, value)
def __getitem__(self, key: str) -> FakeSecret:
def __getitem__(self, key: str) -> Union[FakeSecret, ReplicaSecret]:
for secret in dict.values(self):
if secret.arn == key or secret.name == key:
return secret
@ -241,14 +341,14 @@ class SecretsStore(Dict[str, FakeSecret]):
name = get_secret_name_from_partial_arn(key)
return dict.__contains__(self, name) # type: ignore
def get(self, key: str) -> Optional[FakeSecret]: # type: ignore
def get(self, key: str) -> Optional[Union[FakeSecret, ReplicaSecret]]: # type: ignore
for secret in dict.values(self):
if secret.arn == key or secret.name == key:
return secret
name = get_secret_name_from_partial_arn(key)
return super().get(name)
def pop(self, key: str) -> Optional[FakeSecret]: # type: ignore
def pop(self, key: str) -> Optional[Union[FakeSecret, ReplicaSecret]]: # type: ignore
for secret in dict.values(self):
if secret.arn == key or secret.name == key:
key = secret.name
@ -295,6 +395,8 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if secret.is_deleted():
raise InvalidRequestException(
"You tried to perform the operation on a secret that's currently marked deleted."
@ -391,13 +493,16 @@ class SecretsManagerBackend(BaseBackend):
if secret_id not in self.secrets:
raise SecretNotFoundException()
if self.secrets[secret_id].is_deleted():
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if secret.is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
"You can't perform this operation on the secret because it was marked for deletion."
)
secret = self.secrets[secret_id]
tags = secret.tags
description = description or secret.description
@ -416,15 +521,15 @@ class SecretsManagerBackend(BaseBackend):
def create_secret(
self,
name: str,
secret_string: Optional[str] = None,
secret_binary: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[Dict[str, str]]] = None,
kms_key_id: Optional[str] = None,
client_request_token: Optional[str] = None,
secret_string: Optional[str],
secret_binary: Optional[str],
description: Optional[str],
tags: Optional[List[Dict[str, str]]],
kms_key_id: Optional[str],
client_request_token: Optional[str],
replica_regions: List[Dict[str, str]],
force_overwrite: bool,
) -> str:
# error if secret exists
if name in self.secrets.keys():
raise ResourceExistsException(
"A resource with the ID you requested already exists."
@ -438,6 +543,8 @@ class SecretsManagerBackend(BaseBackend):
tags=tags,
kms_key_id=kms_key_id,
version_id=client_request_token,
replica_regions=replica_regions,
force_overwrite=force_overwrite,
)
return secret.to_short_dict(include_version_id=new_version)
@ -452,6 +559,8 @@ class SecretsManagerBackend(BaseBackend):
kms_key_id: Optional[str] = None,
version_id: Optional[str] = None,
version_stages: Optional[List[str]] = None,
replica_regions: Optional[List[Dict[str, str]]] = None,
force_overwrite: bool = False,
) -> Tuple[FakeSecret, bool]:
if version_stages is None:
@ -475,6 +584,8 @@ class SecretsManagerBackend(BaseBackend):
update_time = int(time.time())
if secret_id in self.secrets:
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.update(description, tags, kms_key_id, last_changed_date=update_time)
@ -498,6 +609,8 @@ class SecretsManagerBackend(BaseBackend):
created_date=update_time,
version_id=version_id,
secret_version=secret_version,
replica_regions=replica_regions,
force_overwrite=force_overwrite,
)
self.secrets[secret_id] = secret
@ -516,6 +629,8 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
else:
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
tags = secret.tags
description = secret.description
@ -533,13 +648,11 @@ class SecretsManagerBackend(BaseBackend):
return secret.to_short_dict(include_version_stages=True, version_id=version_id)
def describe_secret(self, secret_id: str) -> Dict[str, Any]:
def describe_secret(self, secret_id: str) -> Union[FakeSecret, ReplicaSecret]:
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
return secret.to_dict()
return self.secrets[secret_id]
def rotate_secret(
self,
@ -553,7 +666,10 @@ class SecretsManagerBackend(BaseBackend):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
if self.secrets[secret_id].is_deleted():
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if secret.is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
@ -568,17 +684,13 @@ class SecretsManagerBackend(BaseBackend):
if rotation_days in rotation_rules:
rotation_period = rotation_rules[rotation_days]
if rotation_period < 1 or rotation_period > 1000:
msg = (
"RotationRules.AutomaticallyAfterDays " "must be within 1-1000."
)
msg = "RotationRules.AutomaticallyAfterDays must be within 1-1000."
raise InvalidParameterException(msg)
self.secrets[secret_id].next_rotation_date = int(time.time()) + (
secret.next_rotation_date = int(time.time()) + (
int(rotation_period) * 86400
)
secret = self.secrets[secret_id]
# The rotation function must end with the versions of the secret in
# one of two states:
#
@ -674,7 +786,7 @@ class SecretsManagerBackend(BaseBackend):
)
secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"]
self.secrets[secret_id].last_rotation_date = int(time.time())
secret.last_rotation_date = int(time.time())
return secret.to_short_dict()
def get_random_password(
@ -787,7 +899,15 @@ class SecretsManagerBackend(BaseBackend):
deletion_date = utcnow()
return arn, name, self._unix_time_secs(deletion_date)
else:
if self.secrets[secret_id].is_deleted():
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
if len(secret.replicas) > 0:
replica_regions = ",".join([rep.region for rep in secret.replicas])
msg = f"You can't delete secret {secret_id} that still has replica regions [{replica_regions}]"
raise InvalidParameterException(msg)
if secret.is_deleted():
raise InvalidRequestException(
"An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
perform the operation on a secret that's currently marked deleted."
@ -796,11 +916,10 @@ class SecretsManagerBackend(BaseBackend):
deletion_date = utcnow()
if force_delete_without_recovery:
secret = self.secrets.pop(secret_id)
self.secrets.pop(secret_id)
else:
deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
self.secrets[secret_id].delete(self._unix_time_secs(deletion_date))
secret = self.secrets.get(secret_id)
secret.delete(self._unix_time_secs(deletion_date))
if not secret:
raise SecretNotFoundException()
@ -816,6 +935,8 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.restore()
return secret.arn, secret.name
@ -826,6 +947,8 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
old_tags = secret.tags
for tag in tags:
@ -847,6 +970,8 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
tags = secret.tags
for tag in tags:
@ -912,6 +1037,8 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.policy = policy
return secret.arn, secret.name
@ -920,6 +1047,8 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
resp = {
"ARN": secret.arn,
"Name": secret.name,
@ -933,8 +1062,41 @@ class SecretsManagerBackend(BaseBackend):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.policy = None
return secret.arn, secret.name
def replicate_secret_to_regions(
self,
secret_id: str,
replica_regions: List[Dict[str, str]],
force_overwrite: bool,
) -> Tuple[str, List[Dict[str, Any]]]:
secret = self.describe_secret(secret_id)
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
secret.replicas.extend(
secret.create_replicas(replica_regions, force_overwrite=force_overwrite)
)
statuses = [replica.config for replica in secret.replicas]
return secret_id, statuses
def remove_regions_from_replication(
self, secret_id: str, replica_regions: List[str]
) -> Tuple[str, List[Dict[str, str]]]:
secret = self.describe_secret(secret_id)
if isinstance(secret, ReplicaSecret):
raise OperationNotPermittedOnReplica
for replica in secret.replicas.copy():
if replica.region in replica_regions:
backend = secretsmanager_backends[self.account_id][replica.region]
if replica.has_replica:
dict.pop(backend.secrets, replica.arn)
secret.replicas.remove(replica)
statuses = [replica.config for replica in secret.replicas]
return secret_id, statuses
secretsmanager_backends = BackendDict(SecretsManagerBackend, "secretsmanager")

View File

@ -55,8 +55,11 @@ class SecretsManagerResponse(BaseResponse):
secret_binary = self._get_param("SecretBinary")
description = self._get_param("Description", if_none="")
tags = self._get_param("Tags", if_none=[])
kms_key_id = self._get_param("KmsKeyId", if_none=None)
client_request_token = self._get_param("ClientRequestToken", if_none=None)
kms_key_id = self._get_param("KmsKeyId")
client_request_token = self._get_param("ClientRequestToken")
replica_regions = self._get_param("AddReplicaRegions", [])
force_overwrite = self._get_bool_param("ForceOverwriteReplicaSecret", False)
return self.backend.create_secret(
name=name,
secret_string=secret_string,
@ -65,6 +68,8 @@ class SecretsManagerResponse(BaseResponse):
tags=tags,
kms_key_id=kms_key_id,
client_request_token=client_request_token,
replica_regions=replica_regions,
force_overwrite=force_overwrite,
)
def update_secret(self) -> str:
@ -108,7 +113,7 @@ class SecretsManagerResponse(BaseResponse):
def describe_secret(self) -> str:
secret_id = self._get_param("SecretId")
secret = self.backend.describe_secret(secret_id=secret_id)
return json.dumps(secret)
return json.dumps(secret.to_dict())
def rotate_secret(self) -> str:
client_request_token = self._get_param("ClientRequestToken")
@ -212,3 +217,22 @@ class SecretsManagerResponse(BaseResponse):
move_to_version_id=move_to_version_id,
)
return json.dumps({"ARN": arn, "Name": name})
def replicate_secret_to_regions(self) -> str:
secret_id = self._get_param("SecretId")
replica_regions = self._get_param("AddReplicaRegions", [])
force_overwrite = self._get_bool_param("ForceOverwriteReplicaSecret", False)
arn, statuses = self.backend.replicate_secret_to_regions(
secret_id, replica_regions, force_overwrite=force_overwrite
)
return json.dumps({"ARN": arn, "ReplicationStatus": statuses})
def remove_regions_from_replication(self) -> str:
secret_id = self._get_param("SecretId")
replica_regions = self._get_param("RemoveReplicaRegions", [])
arn, statuses = self.backend.remove_regions_from_replication(
secret_id, replica_regions
)
return json.dumps({"ARN": arn, "ReplicationStatus": statuses})

View File

@ -132,7 +132,7 @@ class ParameterDict(DefaultDict[str, List["Parameter"]]):
def _get_secretsmanager_parameter(self, secret_name: str) -> List["Parameter"]:
secrets_backend = secretsmanager_backends[self.account_id][self.region_name]
secret = secrets_backend.describe_secret(secret_name)
secret = secrets_backend.describe_secret(secret_name).to_dict()
version_id_to_stage = secret["VersionIdsToStages"]
# Sort version ID's so that AWSCURRENT is last
sorted_version_ids = [

View File

@ -269,3 +269,13 @@ def test_with_filter_with_negation():
secret_names = list(map(lambda s: s["Name"], secrets["SecretList"]))
assert secret_names == ["baz"]
@mock_aws
def test_filter_with_owning_service():
conn = boto3.client("secretsmanager", region_name="us-west-2")
conn.create_secret(Name="foo", SecretString="secret")
resp = conn.list_secrets(Filters=[{"Key": "owning-service", "Values": ["n/a"]}])
assert resp["SecretList"] == []

View File

@ -0,0 +1,326 @@
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_aws
@mock_aws
def test_filter_with_primary_region():
conn = boto3.client("secretsmanager", region_name="us-east-1")
conn.create_secret(Name="simple", SecretString="secret")
conn.create_secret(
Name="dup", SecretString="s", AddReplicaRegions=[{"Region": "us-east-2"}]
)
secrets = conn.list_secrets(
Filters=[{"Key": "primary-region", "Values": ["us-east-1"]}]
)["SecretList"]
assert len(secrets) == 1
secrets = conn.list_secrets(
Filters=[{"Key": "primary-region", "Values": ["us-east-2"]}]
)["SecretList"]
assert len(secrets) == 0
secrets = conn.list_secrets()["SecretList"]
assert len(secrets) == 2
# If our entry point is us-east-2, but filter in us-east-1
# We can see the replicas in us-east-2 that have Primary Secrets in us-east-1
use2 = boto3.client("secretsmanager", region_name="us-east-2")
secrets = use2.list_secrets(
Filters=[{"Key": "primary-region", "Values": ["us-east-1"]}]
)["SecretList"]
assert len(secrets) == 1
assert secrets[0]["PrimaryRegion"] == "us-east-1"
@mock_aws
def test_create_secret_that_duplicates():
use1 = boto3.client("secretsmanager", region_name="us-east-1")
use2 = boto3.client("secretsmanager", region_name="us-east-2")
resp = use1.create_secret(
Name="dup", SecretString="s", AddReplicaRegions=[{"Region": "us-east-2"}]
)
primary_arn = resp["ARN"]
assert resp["ReplicationStatus"][0]["Region"] == "us-east-2"
assert resp["ReplicationStatus"][0]["Status"] == "InSync"
primary_secret = use1.describe_secret(SecretId=primary_arn)
assert primary_secret["ARN"] == primary_arn
replica_arn = primary_arn.replace("us-east-1", "us-east-2")
replica_secret = use2.describe_secret(SecretId=replica_arn)
assert replica_secret["ARN"] == replica_arn
assert replica_secret["Name"] == "dup"
assert replica_secret["VersionIdsToStages"] == primary_secret["VersionIdsToStages"]
replica_value = use2.get_secret_value(SecretId=replica_arn)
assert replica_value["Name"] == "dup"
assert replica_value["SecretString"] == "s"
@mock_aws
def test_create_secret_that_duplicates__in_same_region():
conn = boto3.client("secretsmanager", region_name="us-east-1")
with pytest.raises(ClientError) as exc:
conn.create_secret(
Name="dup_same_region",
SecretString="s",
AddReplicaRegions=[{"Region": "us-east-1"}],
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == "Invalid replica region."
@mock_aws
def test_replicate_secret():
us1 = boto3.client("secretsmanager", region_name="us-east-1")
us2 = boto3.client("secretsmanager", region_name="us-east-2")
resp = us1.create_secret(Name="dup", SecretString="s")
primary_arn = resp["ARN"]
resp = us1.replicate_secret_to_regions(
SecretId=primary_arn, AddReplicaRegions=[{"Region": "us-east-2"}]
)
assert resp["ReplicationStatus"][0]["Region"] == "us-east-2"
assert resp["ReplicationStatus"][0]["Status"] == "InSync"
replica_arn = primary_arn.replace("us-east-1", "us-east-2")
replica_secret = us2.describe_secret(SecretId=replica_arn)
assert replica_secret["ARN"] == replica_arn
@mock_aws
def test_replicate_secret__in_same_region():
conn = boto3.client("secretsmanager", region_name="us-east-1")
resp = conn.create_secret(Name="dup", SecretString="s")
with pytest.raises(ClientError) as exc:
conn.replicate_secret_to_regions(
SecretId=resp["ARN"], AddReplicaRegions=[{"Region": "us-east-1"}]
)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == "Invalid replica region."
@mock_aws
def test_replicate_secret__existing_secret():
use1 = boto3.client("secretsmanager", region_name="us-east-1")
use2 = boto3.client("secretsmanager", region_name="us-east-2")
secret1 = use1.create_secret(Name="dup", SecretString="s1")["ARN"]
secret2 = use2.create_secret(Name="dup", SecretString="s2")["ARN"]
resp = use1.replicate_secret_to_regions(
SecretId=secret1, AddReplicaRegions=[{"Region": "us-east-2"}]
)
assert resp["ReplicationStatus"][0]["Region"] == "us-east-2"
assert resp["ReplicationStatus"][0]["Status"] == "Failed"
assert (
resp["ReplicationStatus"][0]["StatusMessage"]
== "Replication failed: Secret name simple already exists in region us-east-2."
)
# us-east-2 still only has one secret
assert len(use2.list_secrets()["SecretList"]) == 1
# Both secrets still have original values
assert use1.get_secret_value(SecretId=secret1)["SecretString"] == "s1"
assert use2.get_secret_value(SecretId=secret2)["SecretString"] == "s2"
@mock_aws
def test_replicate_secret__overwrite_existing_secret():
use1 = boto3.client("secretsmanager", region_name="us-east-1")
use2 = boto3.client("secretsmanager", region_name="us-east-2")
secret1 = use1.create_secret(Name="dup", SecretString="s1")["ARN"]
secret2 = use2.create_secret(Name="dup", SecretString="s2")["ARN"]
resp = use1.replicate_secret_to_regions(
SecretId=secret1,
AddReplicaRegions=[{"Region": "us-east-2"}],
ForceOverwriteReplicaSecret=True,
)
assert resp["ReplicationStatus"][0]["Status"] == "InSync"
# us-east-2 still only has one secret
assert len(use2.list_secrets()["SecretList"]) == 1
# Original Secret was yeeted
with pytest.raises(ClientError):
assert use2.get_secret_value(SecretId=secret2)["SecretString"] == "s1"
# Use ListSecrets to get the new ARN
# And verify it returns a copy of the us-east-1 secret
secret2 = use2.list_secrets()["SecretList"][0]["ARN"]
assert use2.get_secret_value(SecretId=secret2)["SecretString"] == "s1"
@mock_aws
def test_remove_regions_from_replication():
use1 = boto3.client("secretsmanager", region_name="us-east-1")
use2 = boto3.client("secretsmanager", region_name="us-east-2")
usw2 = boto3.client("secretsmanager", region_name="us-west-2")
arn1 = use1.create_secret(Name="dup", SecretString="s1")["ARN"]
arn2 = arn1.replace("us-east-1", "us-east-2")
arn3 = arn1.replace("us-east-1", "us-west-2")
# Replicate to multiple regions
use1.replicate_secret_to_regions(
SecretId=arn1,
AddReplicaRegions=[{"Region": "us-east-2"}, {"Region": "us-west-2"}],
)
# Replications exist
use2.describe_secret(SecretId=arn2)
usw2.describe_secret(SecretId=arn3)
# Primary Secret knows about replicas
replications = use1.describe_secret(SecretId=arn1)["ReplicationStatus"]
assert set([rep["Region"] for rep in replications]) == {"us-east-2", "us-west-2"}
# Remove us-east-2 replication
use1.remove_regions_from_replication(
SecretId=arn1, RemoveReplicaRegions=["us-east-2"]
)
with pytest.raises(ClientError):
use2.describe_secret(SecretId=arn2)
# us-west still exists
usw2.describe_secret(SecretId=arn3)
# Primary Secret no longer knows about us-east-2
replications = use1.describe_secret(SecretId=arn1)["ReplicationStatus"]
assert set([rep["Region"] for rep in replications]) == {"us-west-2"}
# Removing region is idempotent - invoking it again does not result in any errors
use1.remove_regions_from_replication(
SecretId=arn1, RemoveReplicaRegions=["us-east-2"]
)
@mock_aws
def test_update_replica():
use1 = boto3.client("secretsmanager", region_name="us-east-1")
use2 = boto3.client("secretsmanager", region_name="us-east-2")
arn1 = use1.create_secret(
Name="dup", SecretString="s1", AddReplicaRegions=[{"Region": "us-east-2"}]
)["ARN"]
arn2 = arn1.replace("us-east-1", "us-east-2")
OP_NOT_PERMITTED = "Operation not permitted on a replica secret. Call must be made in primary secret's region."
with pytest.raises(ClientError) as exc:
use2.update_secret(SecretId=arn2, SecretString="asdf")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.put_secret_value(SecretId=arn2, SecretString="asdf")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.tag_resource(SecretId=arn2, Tags=[{"Key": "k", "Value": "v"}])
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.untag_resource(SecretId=arn2, TagKeys=["k1"])
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.put_resource_policy(SecretId=arn2, ResourcePolicy="k1")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.get_resource_policy(SecretId=arn2)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.delete_resource_policy(SecretId=arn2)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.delete_secret(SecretId=arn2)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.cancel_rotate_secret(SecretId=arn2)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.rotate_secret(SecretId=arn2)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.restore_secret(SecretId=arn2)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.replicate_secret_to_regions(SecretId=arn2, AddReplicaRegions=[{}])
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
with pytest.raises(ClientError) as exc:
use2.remove_regions_from_replication(SecretId=arn2, RemoveReplicaRegions=["a"])
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert err["Message"] == OP_NOT_PERMITTED
@mock_aws
def test_delete_while_duplication_exist():
use1 = boto3.client("secretsmanager", region_name="us-east-1")
region = "us-east-2"
arn = use1.create_secret(
Name="dup", SecretString="s1", AddReplicaRegions=[{"Region": region}]
)["ARN"]
# unable to delete
with pytest.raises(ClientError) as exc:
use1.delete_secret(SecretId=arn)
err = exc.value.response["Error"]
assert err["Code"] == "InvalidParameterException"
assert (
err["Message"]
== f"You can't delete secret {arn} that still has replica regions [{region}]"
)
# Remove us-east-2 replication
use1.remove_regions_from_replication(SecretId=arn, RemoveReplicaRegions=[region])
# Now we can delete
use1.delete_secret(SecretId=arn)