From 0ec99fae8b40476a5b811784dc342cab4ec7e525 Mon Sep 17 00:00:00 2001 From: George Lungley <54810125+glungley@users.noreply.github.com> Date: Tue, 3 Aug 2021 15:46:23 +0100 Subject: [PATCH] #4118 Add KmsKeyId Support to secretsmanager (#4119) --- moto/secretsmanager/models.py | 31 +++++++-- moto/secretsmanager/responses.py | 4 ++ .../test_secretsmanager.py | 69 ++++++++++++++++--- 3 files changed, 89 insertions(+), 15 deletions(-) diff --git a/moto/secretsmanager/models.py b/moto/secretsmanager/models.py index 360e08f81..fdd0d3f34 100644 --- a/moto/secretsmanager/models.py +++ b/moto/secretsmanager/models.py @@ -61,6 +61,7 @@ class FakeSecret: secret_binary=None, description=None, tags=[], + kms_key_id=None, version_id=None, version_stages=None, ): @@ -71,6 +72,7 @@ class FakeSecret: self.secret_binary = secret_binary self.description = description self.tags = tags + self.kms_key_id = kms_key_id self.version_id = version_id self.version_stages = version_stages self.rotation_enabled = False @@ -78,10 +80,13 @@ class FakeSecret: self.auto_rotate_after_days = 0 self.deleted_date = None - def update(self, description=None, tags=[]): + def update(self, description=None, tags=[], kms_key_id=None): self.description = description self.tags = tags + if kms_key_id is not None: + self.kms_key_id = kms_key_id + def set_versions(self, versions): self.versions = versions @@ -127,7 +132,7 @@ class FakeSecret: "ARN": self.arn, "Name": self.name, "Description": self.description or "", - "KmsKeyId": "", + "KmsKeyId": self.kms_key_id, "RotationEnabled": self.rotation_enabled, "RotationLambdaARN": self.rotation_lambda_arn, "RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days}, @@ -242,7 +247,12 @@ class SecretsManagerBackend(BaseBackend): return response def update_secret( - self, secret_id, secret_string=None, secret_binary=None, **kwargs + self, + secret_id, + secret_string=None, + secret_binary=None, + kms_key_id=None, + **kwargs ): # error if secret does not exist @@ -265,12 +275,19 @@ class SecretsManagerBackend(BaseBackend): secret_binary=secret_binary, description=description, tags=tags, + kms_key_id=kms_key_id, ) return secret.to_short_dict() def create_secret( - self, name, secret_string=None, secret_binary=None, description=None, tags=[] + self, + name, + secret_string=None, + secret_binary=None, + description=None, + tags=[], + kms_key_id=None, ): # error if secret exists @@ -285,6 +302,7 @@ class SecretsManagerBackend(BaseBackend): secret_binary=secret_binary, description=description, tags=tags, + kms_key_id=kms_key_id, ) return secret.to_short_dict() @@ -296,6 +314,7 @@ class SecretsManagerBackend(BaseBackend): secret_binary=None, description=None, tags=[], + kms_key_id=None, version_id=None, version_stages=None, ): @@ -319,7 +338,8 @@ class SecretsManagerBackend(BaseBackend): if secret_id in self.secrets: secret = self.secrets[secret_id] - secret.update(description, tags) + + secret.update(description, tags, kms_key_id) if "AWSPENDING" in version_stages: secret.versions[version_id] = secret_version @@ -333,6 +353,7 @@ class SecretsManagerBackend(BaseBackend): secret_binary=secret_binary, description=description, tags=tags, + kms_key_id=kms_key_id, ) secret.set_versions({version_id: secret_version}) secret.set_default_version_id(version_id) diff --git a/moto/secretsmanager/responses.py b/moto/secretsmanager/responses.py index 0433a565e..8dd2a8a92 100644 --- a/moto/secretsmanager/responses.py +++ b/moto/secretsmanager/responses.py @@ -46,22 +46,26 @@ class SecretsManagerResponse(BaseResponse): secret_binary = self._get_param("SecretBinary") description = self._get_param("Description", if_none="") tags = self._get_param("Tags", if_none=[]) + kms_key_id = self._get_param("KmsKeyId", if_none=None) return secretsmanager_backends[self.region].create_secret( name=name, secret_string=secret_string, secret_binary=secret_binary, description=description, tags=tags, + kms_key_id=kms_key_id, ) def update_secret(self): secret_id = self._get_param("SecretId") secret_string = self._get_param("SecretString") secret_binary = self._get_param("SecretBinary") + kms_key_id = self._get_param("KmsKeyId", if_none=None) return secretsmanager_backends[self.region].update_secret( secret_id=secret_id, secret_string=secret_string, secret_binary=secret_binary, + kms_key_id=kms_key_id, ) def get_random_password(self): diff --git a/tests/test_secretsmanager/test_secretsmanager.py b/tests/test_secretsmanager/test_secretsmanager.py index d4420a8ec..3afc23c06 100644 --- a/tests/test_secretsmanager/test_secretsmanager.py +++ b/tests/test_secretsmanager/test_secretsmanager.py @@ -455,6 +455,21 @@ def test_describe_secret_with_arn(): conn.list_secrets()["SecretList"][0]["ARN"].should.equal(results["ARN"]) +@mock_secretsmanager +def test_describe_secret_with_KmsKeyId(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + results = conn.create_secret( + Name="test-secret", SecretString="foosecret", KmsKeyId="dummy_arn" + ) + + secret_description = conn.describe_secret(SecretId=results["ARN"]) + + secret_description["KmsKeyId"].should.equal("dummy_arn") + conn.list_secrets()["SecretList"][0]["KmsKeyId"].should.equal( + secret_description["KmsKeyId"] + ) + + @mock_secretsmanager def test_describe_secret_that_does_not_exist(): conn = boto3.client("secretsmanager", region_name="us-west-2") @@ -643,7 +658,7 @@ def lambda_handler(event, context): arn = event['SecretId'] token = event['ClientRequestToken'] step = event['Step'] - + client = boto3.client("secretsmanager", region_name="us-west-2", endpoint_url="http://motoserver:5000") metadata = client.describe_secret(SecretId=arn) value = client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING") @@ -661,31 +676,31 @@ def lambda_handler(event, context): elif "AWSPENDING" not in versions[token]: print("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) - + if step == 'createSecret': try: client.get_secret_value(SecretId=arn, VersionId=token, VersionStage='AWSPENDING') except client.exceptions.ResourceNotFoundException: client.put_secret_value( - SecretId=arn, - ClientRequestToken=token, - SecretString=json.dumps({'create': True}), + SecretId=arn, + ClientRequestToken=token, + SecretString=json.dumps({'create': True}), VersionStages=['AWSPENDING'] ) if step == 'setSecret': client.put_secret_value( - SecretId=arn, - ClientRequestToken=token, - SecretString='UpdatedValue', + SecretId=arn, + ClientRequestToken=token, + SecretString='UpdatedValue', VersionStages=["AWSPENDING"], ) - + elif step == 'finishSecret': current_version = next( version for version, stages in metadata['VersionIdsToStages'].items() - if 'AWSCURRENT' in stages + if 'AWSCURRENT' in stages ) print("current: %s new: %s" % (current_version, token)) client.update_secret_version_stage( @@ -1000,6 +1015,40 @@ def test_update_secret_with_tags_and_description(): assert secret_details["Description"] == "desc" +@mock_secretsmanager +def test_update_secret_with_KmsKeyId(): + conn = boto3.client("secretsmanager", region_name="us-west-2") + + created_secret = conn.create_secret( + Name="test-secret", SecretString="foosecret", KmsKeyId="foo_arn" + ) + + assert created_secret["ARN"] + assert created_secret["Name"] == "test-secret" + assert created_secret["VersionId"] != "" + + secret = conn.get_secret_value(SecretId="test-secret") + assert secret["SecretString"] == "foosecret" + + secret_details = conn.describe_secret(SecretId="test-secret") + secret_details["KmsKeyId"].should.equal("foo_arn") + + updated_secret = conn.update_secret( + SecretId="test-secret", SecretString="barsecret", KmsKeyId="bar_arn" + ) + + assert updated_secret["ARN"] + assert updated_secret["Name"] == "test-secret" + assert updated_secret["VersionId"] != "" + + secret = conn.get_secret_value(SecretId="test-secret") + assert secret["SecretString"] == "barsecret" + assert created_secret["VersionId"] != updated_secret["VersionId"] + + secret_details = conn.describe_secret(SecretId="test-secret") + secret_details["KmsKeyId"].should.equal("bar_arn") + + @mock_secretsmanager def test_update_secret_which_does_not_exit(): conn = boto3.client("secretsmanager", region_name="us-west-2")