SecretsManager - put/delete_resource_policy (#6049)

This commit is contained in:
Bert Blommers 2023-03-10 22:45:21 -01:00 committed by GitHub
parent c7478495a2
commit d022b404d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 27 deletions

View File

@ -6055,11 +6055,11 @@
## secretsmanager
<details>
<summary>72% implemented</summary>
<summary>81% implemented</summary>
- [X] cancel_rotate_secret
- [X] create_secret
- [ ] delete_resource_policy
- [X] delete_resource_policy
- [X] delete_secret
- [X] describe_secret
- [X] get_random_password
@ -6067,7 +6067,7 @@
- [X] get_secret_value
- [X] list_secret_version_ids
- [X] list_secrets
- [ ] put_resource_policy
- [X] put_resource_policy
- [X] put_secret_value
- [ ] remove_regions_from_replication
- [ ] replicate_secret_to_regions

View File

@ -27,7 +27,7 @@ secretsmanager
- [X] cancel_rotate_secret
- [X] create_secret
- [ ] delete_resource_policy
- [X] delete_resource_policy
- [X] delete_secret
- [X] describe_secret
- [X] get_random_password
@ -35,7 +35,11 @@ secretsmanager
- [X] get_secret_value
- [X] list_secret_version_ids
- [X] list_secrets
- [ ] put_resource_policy
- [X] put_resource_policy
The BlockPublicPolicy-parameter is not yet implemented
- [X] put_secret_value
- [ ] remove_regions_from_replication
- [ ] replicate_secret_to_regions

View File

@ -87,6 +87,7 @@ class FakeSecret:
self.rotation_lambda_arn = ""
self.auto_rotate_after_days = 0
self.deleted_date = None
self.policy = None
def update(
self, description=None, tags=None, kms_key_id=None, last_changed_date=None
@ -825,29 +826,37 @@ class SecretsManagerBackend(BaseBackend):
return secret_id
@staticmethod
def get_resource_policy(secret_id):
resource_policy = {
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::111122223333:root",
"arn:aws:iam::444455556666:root",
]
},
"Action": ["secretsmanager:GetSecretValue"],
"Resource": "*",
},
def put_resource_policy(self, secret_id: str, policy: str):
"""
The BlockPublicPolicy-parameter is not yet implemented
"""
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
secret.policy = policy
return secret.arn, secret.name
def get_resource_policy(self, secret_id):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
resp = {
"ARN": secret.arn,
"Name": secret.name,
}
return json.dumps(
{
"ARN": secret_id,
"Name": secret_id,
"ResourcePolicy": json.dumps(resource_policy),
}
)
if secret.policy is not None:
resp["ResourcePolicy"] = secret.policy
return json.dumps(resp)
def delete_resource_policy(self, secret_id):
if not self._is_valid_identifier(secret_id):
raise SecretNotFoundException()
secret = self.secrets[secret_id]
secret.policy = None
return secret.arn, secret.name
secretsmanager_backends = BackendDict(SecretsManagerBackend, "secretsmanager")

View File

@ -174,6 +174,17 @@ class SecretsManagerResponse(BaseResponse):
secret_id = self._get_param("SecretId")
return self.backend.get_resource_policy(secret_id=secret_id)
def put_resource_policy(self):
secret_id = self._get_param("SecretId")
policy = self._get_param("ResourcePolicy")
arn, name = self.backend.put_resource_policy(secret_id, policy)
return json.dumps(dict(ARN=arn, Name=name))
def delete_resource_policy(self):
secret_id = self._get_param("SecretId")
arn, name = self.backend.delete_resource_policy(secret_id)
return json.dumps(dict(ARN=arn, Name=name))
def tag_resource(self):
secret_id = self._get_param("SecretId")
tags = self._get_param("Tags", if_none=[])

View File

@ -452,6 +452,14 @@ s3:
- TestAccS3ObjectsDataSource_fetchOwner
sagemaker:
- TestAccSageMakerPrebuiltECRImageDataSource
secretsmanager:
- TestAccSecretsManagerSecretDataSource_basic
- TestAccSecretsManagerSecretPolicy_
- TestAccSecretsManagerSecret_RecoveryWindowInDays_recreate
- TestAccSecretsManagerSecret_tags
- TestAccSecretsManagerSecret_kmsKeyID
- TestAccSecretsManagerSecret_withNamePrefix
- TestAccSecretsManagerSecret_rotationRules
servicediscovery:
- TestAccServiceDiscoveryDNSNamespaceDataSource
- TestAccServiceDiscoveryHTTPNamespace

View File

@ -0,0 +1,77 @@
import boto3
import json
import pytest
from botocore.exceptions import ClientError
from moto import mock_secretsmanager
@mock_secretsmanager
def test_get_initial_policy():
client = boto3.client("secretsmanager", region_name="us-west-2")
client.create_secret(Name="test-secret")
resp = client.get_resource_policy(SecretId="test-secret")
assert resp.get("Name") == "test-secret"
assert "ARN" in resp
assert "ResourcePolicy" not in resp
@mock_secretsmanager
def test_put_resource_policy():
client = boto3.client("secretsmanager", region_name="us-west-2")
client.create_secret(Name="test-secret")
policy = {
"Statement": [
{
"Action": "secretsmanager:GetSecretValue",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:role/tf-acc-test-655046176950657276"
},
"Resource": "*",
"Sid": "EnableAllPermissions",
}
],
"Version": "2012-10-17",
}
resp = client.put_resource_policy(
SecretId="test-secret", ResourcePolicy=json.dumps(policy)
)
assert "ARN" in resp
assert "Name" in resp
resp = client.get_resource_policy(SecretId="test-secret")
assert "ResourcePolicy" in resp
assert json.loads(resp["ResourcePolicy"]) == policy
@mock_secretsmanager
def test_delete_resource_policy():
client = boto3.client("secretsmanager", region_name="us-west-2")
client.create_secret(Name="test-secret")
client.put_resource_policy(SecretId="test-secret", ResourcePolicy="some policy")
client.delete_resource_policy(SecretId="test-secret")
resp = client.get_resource_policy(SecretId="test-secret")
assert "ResourcePolicy" not in resp
@mock_secretsmanager
def test_policies_for_unknown_secrets():
client = boto3.client("secretsmanager", region_name="us-west-2")
with pytest.raises(ClientError) as exc:
client.put_resource_policy(SecretId="unknown secret", ResourcePolicy="p")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
with pytest.raises(ClientError) as exc:
client.get_resource_policy(SecretId="unknown secret")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
with pytest.raises(ClientError) as exc:
client.delete_resource_policy(SecretId="unknown secret")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"