fixed default kms keys;fixed object locking exception (#4385)

This commit is contained in:
Macwan Nevil 2021-10-26 15:27:58 +05:30 committed by GitHub
parent ca09cc3fc1
commit 5771dcf73b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 144 additions and 47 deletions

View File

@ -10,7 +10,13 @@ from moto.core.utils import unix_time
from moto.utilities.tagging_service import TaggingService
from moto.core.exceptions import JsonRESTError
from .utils import decrypt, encrypt, generate_key_id, generate_master_key
from .utils import (
RESERVED_ALIASES,
decrypt,
encrypt,
generate_key_id,
generate_master_key,
)
class Key(CloudFormationModel):
@ -152,11 +158,18 @@ class Key(CloudFormationModel):
class KmsBackend(BaseBackend):
def __init__(self):
def __init__(self, region):
self.region = region
self.keys = {}
self.key_to_aliases = defaultdict(set)
self.tagger = TaggingService(key_name="TagKey", value_name="TagValue")
def reset(self):
region = self.region
self._reset_model_refs()
self.__dict__ = {}
self.__init__(region)
@staticmethod
def default_vpc_endpoint_service(service_region, zones):
"""Default VPC endpoint service."""
@ -164,6 +177,20 @@ class KmsBackend(BaseBackend):
service_region, zones, "kms"
)
def _generate_default_keys(self, alias_name):
"""Creates default kms keys """
if alias_name in RESERVED_ALIASES:
key = self.create_key(
None,
"ENCRYPT_DECRYPT",
"SYMMETRIC_DEFAULT",
"Default key",
None,
self.region,
)
self.add_alias(key.id, alias_name)
return key.id
def create_key(
self, policy, key_usage, customer_master_key_spec, description, tags, region
):
@ -190,7 +217,7 @@ class KmsBackend(BaseBackend):
# describe key not just KeyId
key_id = self.get_key_id(key_id)
if r"alias/" in str(key_id).lower():
key_id = self.get_key_id_from_alias(key_id.split("alias/")[1])
key_id = self.get_key_id_from_alias(key_id)
return self.keys[self.get_key_id(key_id)]
def list_keys(self):
@ -250,6 +277,9 @@ class KmsBackend(BaseBackend):
for key_id, aliases in dict(self.key_to_aliases).items():
if alias_name in ",".join(aliases):
return key_id
if alias_name in RESERVED_ALIASES:
key_id = self._generate_default_keys(alias_name)
return key_id
return None
def enable_key_rotation(self, key_id):
@ -383,8 +413,8 @@ class KmsBackend(BaseBackend):
kms_backends = {}
for region in Session().get_available_regions("kms"):
kms_backends[region] = KmsBackend()
kms_backends[region] = KmsBackend(region)
for region in Session().get_available_regions("kms", partition_name="aws-us-gov"):
kms_backends[region] = KmsBackend()
kms_backends[region] = KmsBackend(region)
for region in Session().get_available_regions("kms", partition_name="aws-cn"):
kms_backends[region] = KmsBackend()
kms_backends[region] = KmsBackend(region)

View File

@ -5,6 +5,7 @@ import re
from moto.core import ACCOUNT_ID
from moto.core.responses import BaseResponse
from moto.kms.utils import RESERVED_ALIASES
from .models import kms_backends
from .exceptions import (
NotFoundException,
@ -13,13 +14,6 @@ from .exceptions import (
NotAuthorizedException,
)
reserved_aliases = [
"alias/aws/ebs",
"alias/aws/s3",
"alias/aws/redshift",
"alias/aws/rds",
]
class KmsResponse(BaseResponse):
@property
@ -199,7 +193,7 @@ class KmsResponse(BaseResponse):
if not alias_name.startswith("alias/"):
raise ValidationException("Invalid identifier")
if alias_name in reserved_aliases:
if alias_name in RESERVED_ALIASES:
raise NotAuthorizedException()
if ":" in alias_name:
@ -253,22 +247,12 @@ class KmsResponse(BaseResponse):
def list_aliases(self):
"""https://docs.aws.amazon.com/kms/latest/APIReference/API_ListAliases.html"""
region = self.region
# TODO: The actual API can filter on KeyId.
response_aliases = [
{
"AliasArn": "arn:aws:kms:{region}:{account_id}:{reserved_alias}".format(
region=region, account_id=ACCOUNT_ID, reserved_alias=reserved_alias
),
"AliasName": reserved_alias,
}
for reserved_alias in reserved_aliases
]
response_aliases = []
backend_aliases = self.kms_backend.get_all_aliases()
for target_key_id, aliases in backend_aliases.items():
for alias_name in aliases:
# TODO: add creation date and last updated in response_aliases
response_aliases.append(
{
"AliasArn": "arn:aws:kms:{region}:{account_id}:{alias_name}".format(
@ -278,6 +262,21 @@ class KmsResponse(BaseResponse):
"TargetKeyId": target_key_id,
}
)
for reserved_alias in RESERVED_ALIASES:
exsisting = [
a for a in response_aliases if a["AliasName"] == reserved_alias
]
if not exsisting:
response_aliases.append(
{
"AliasArn": "arn:aws:kms:{region}:{account_id}:{reserved_alias}".format(
region=region,
account_id=ACCOUNT_ID,
reserved_alias=reserved_alias,
),
"AliasName": reserved_alias,
}
)
return json.dumps({"Truncated": False, "Aliases": response_aliases})

View File

@ -26,6 +26,23 @@ CIPHERTEXT_HEADER_FORMAT = ">{key_id_len}s{iv_len}s{tag_len}s".format(
)
Ciphertext = namedtuple("Ciphertext", ("key_id", "iv", "ciphertext", "tag"))
RESERVED_ALIASES = [
"alias/aws/acm",
"alias/aws/dynamodb",
"alias/aws/ebs",
"alias/aws/elasticfilesystem",
"alias/aws/es",
"alias/aws/glue",
"alias/aws/kinesisvideo",
"alias/aws/lambda",
"alias/aws/rds",
"alias/aws/redshift",
"alias/aws/s3",
"alias/aws/secretsmanager",
"alias/aws/ssm",
"alias/aws/xray",
]
def generate_key_id():
return str(uuid.uuid4())

View File

@ -565,3 +565,13 @@ class InvalidTagError(S3ClientError):
super(InvalidTagError, self).__init__(
"InvalidTag", value, *args, **kwargs,
)
class ObjectLockConfigurationNotFoundError(S3ClientError):
code = 404
def __init__(self):
super(ObjectLockConfigurationNotFoundError, self).__init__(
"ObjectLockConfigurationNotFoundError",
"Object Lock configuration does not exist for this bucket",
)

View File

@ -45,6 +45,7 @@ from moto.s3.exceptions import (
InvalidPublicAccessBlockConfiguration,
WrongPublicAccessBlockAccountIdError,
NoSuchUpload,
ObjectLockConfigurationNotFoundError,
InvalidTagError,
)
from .cloud_formation import cfn_to_api_encryption, is_replacement_update
@ -106,7 +107,7 @@ class FakeKey(BaseModel):
kms_key_id=None,
bucket_key_enabled=None,
lock_mode=None,
lock_legal_status="OFF",
lock_legal_status=None,
lock_until=None,
):
self.name = name
@ -270,6 +271,19 @@ class FakeKey(BaseModel):
if self.website_redirect_location:
res["x-amz-website-redirect-location"] = self.website_redirect_location
if self.lock_legal_status:
res["x-amz-object-lock-legal-hold"] = self.lock_legal_status
if self.lock_until:
res["x-amz-object-lock-retain-until-date"] = self.lock_until
if self.lock_mode:
res["x-amz-object-lock-mode"] = self.lock_mode
if self.lock_legal_status:
res["x-amz-object-lock-legal-hold"] = self.lock_legal_status
if self.lock_until:
res["x-amz-object-lock-retain-until-date"] = self.lock_until
if self.lock_mode:
res["x-amz-object-lock-mode"] = self.lock_mode
return res
@ -1164,7 +1178,7 @@ class FakeBucket(CloudFormationModel):
if "BucketEncryption" in properties:
bucket_encryption = cfn_to_api_encryption(properties["BucketEncryption"])
s3_backend.put_bucket_encryption(
bucket_name=resource_name, encryption=[bucket_encryption]
bucket_name=resource_name, encryption=bucket_encryption
)
return bucket
@ -1194,7 +1208,7 @@ class FakeBucket(CloudFormationModel):
properties["BucketEncryption"]
)
s3_backend.put_bucket_encryption(
bucket_name=original_resource.name, encryption=[bucket_encryption]
bucket_name=original_resource.name, encryption=bucket_encryption
)
return original_resource
@ -1552,7 +1566,7 @@ class S3Backend(BaseBackend):
kms_key_id=None,
bucket_key_enabled=None,
lock_mode=None,
lock_legal_status="OFF",
lock_legal_status=None,
lock_until=None,
):
key_name = clean_key_name(key_name)
@ -1561,6 +1575,24 @@ class S3Backend(BaseBackend):
bucket = self.get_bucket(bucket_name)
# getting default config from bucket if not included in put request
if bucket.encryption:
bucket_key_enabled = (
bucket_key_enabled or bucket.encryption["Rule"]["BucketKeyEnabled"]
)
kms_key_id = (
kms_key_id
or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][
"KMSMasterKeyID"
]
)
encryption = (
encryption
or bucket.encryption["Rule"]["ApplyServerSideEncryptionByDefault"][
"SSEAlgorithm"
]
)
new_key = FakeKey(
name=key_name,
value=value,
@ -1646,6 +1678,8 @@ class S3Backend(BaseBackend):
def get_object_lock_configuration(self, bucket_name):
bucket = self.get_bucket(bucket_name)
if not bucket.object_lock_enabled:
raise ObjectLockConfigurationNotFoundError
return (
bucket.object_lock_enabled,
bucket.default_lock_mode,

View File

@ -1394,7 +1394,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
lock_mode = request.headers.get("x-amz-object-lock-mode", None)
lock_until = request.headers.get("x-amz-object-lock-retain-until-date", None)
legal_hold = request.headers.get("x-amz-object-lock-legal-hold", "OFF")
legal_hold = request.headers.get("x-amz-object-lock-legal-hold", None)
if lock_mode or lock_until or legal_hold == "ON":
if not request.headers.get("Content-Md5"):
@ -1747,8 +1747,8 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
def _mode_until_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
return (
parsed_xml["Retention"]["Mode"],
parsed_xml["Retention"]["RetainUntilDate"],
parsed_xml.get("Retention", None).get("Mode", None),
parsed_xml.get("Retention", None).get("RetainUntilDate", None),
)
def _legal_hold_status_from_xml(self, xml):
@ -1769,7 +1769,7 @@ class ResponseObject(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
):
raise MalformedXML()
return [parsed_xml["ServerSideEncryptionConfiguration"]]
return parsed_xml["ServerSideEncryptionConfiguration"]
def _logging_from_xml(self, xml):
parsed_xml = xmltodict.parse(xml)
@ -2567,17 +2567,17 @@ S3_NO_LOGGING_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
S3_ENCRYPTION_CONFIG = """<?xml version="1.0" encoding="UTF-8"?>
<ServerSideEncryptionConfiguration xmlns="http://doc.s3.amazonaws.com/2006-03-01">
{% for entry in encryption %}
{% if encryption %}
<Rule>
<ApplyServerSideEncryptionByDefault>
<SSEAlgorithm>{{ entry["Rule"]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] }}</SSEAlgorithm>
{% if entry["Rule"]["ApplyServerSideEncryptionByDefault"].get("KMSMasterKeyID") %}
<KMSMasterKeyID>{{ entry["Rule"]["ApplyServerSideEncryptionByDefault"]["KMSMasterKeyID"] }}</KMSMasterKeyID>
<SSEAlgorithm>{{ encryption["Rule"]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] }}</SSEAlgorithm>
{% if encryption["Rule"]["ApplyServerSideEncryptionByDefault"].get("KMSMasterKeyID") %}
<KMSMasterKeyID>{{ encryption["Rule"]["ApplyServerSideEncryptionByDefault"]["KMSMasterKeyID"] }}</KMSMasterKeyID>
{% endif %}
</ApplyServerSideEncryptionByDefault>
<BucketKeyEnabled>{{ 'true' if entry["Rule"].get("BucketKeyEnabled") == 'true' else 'false' }}</BucketKeyEnabled>
<BucketKeyEnabled>{{ 'true' if encryption["Rule"].get("BucketKeyEnabled") == 'true' else 'false' }}</BucketKeyEnabled>
</Rule>
{% endfor %}
{% endif %}
</ServerSideEncryptionConfiguration>
"""

View File

@ -10,4 +10,9 @@ TestAccAWSDefaultSecurityGroup_Classic_
TestAccDataSourceAwsNetworkInterface_CarrierIPAssociation
TestAccAWSRouteTable_IPv4_To_LocalGateway
TestAccAWSRouteTable_IPv4_To_VpcEndpoint
TestAccAWSRouteTable_VpcClassicLink
TestAccAWSRouteTable_VpcClassicLink
TestAccAWSS3BucketObject_NonVersioned
TestAccAWSS3BucketObject_ignoreTags
TestAccAWSS3BucketObject_updatesWithVersioningViaAccessPoint
TestAccAWSS3BucketObject_updates
TestAccAWSS3BucketObject_updatesWithVersioning

View File

@ -110,4 +110,5 @@ TestAccDataSourceAwsNetworkInterface_
TestAccAWSNatGateway
TestAccAWSRouteTable_
TestAccAWSRouteTableAssociation_
TestAccAWSS3Bucket_forceDestroyWithObjectLockEnabled
TestAccAWSS3Bucket_forceDestroyWithObjectLockEnabled
TestAccAWSS3BucketObject_

View File

@ -687,7 +687,7 @@ def test__list_aliases():
]
).should.equal(3)
len(aliases).should.equal(7)
len(aliases).should.equal(17)
@mock_kms_deprecated
@ -772,7 +772,7 @@ def test_key_tag_added_arn_based_happy():
# Has boto3 equivalent
@mock_kms_deprecated
def test_key_tagging_sad():
b = KmsBackend()
b = KmsBackend(region="eu-north-1")
try:
b.tag_resource("unknown", [])

View File

@ -208,15 +208,16 @@ def test__create_alias__can_create_multiple_aliases_for_same_key_id():
@mock_kms
def test_list_aliases():
client = boto3.client("kms", region_name="us-east-1")
region = "us-west-1"
client = boto3.client("kms", region_name=region)
client.create_key(Description="my key")
aliases = client.list_aliases()["Aliases"]
aliases.should.have.length_of(4)
aliases.should.have.length_of(14)
default_alias_names = ["aws/ebs", "aws/s3", "aws/redshift", "aws/rds"]
for name in default_alias_names:
full_name = "alias/{}".format(name)
arn = "arn:aws:kms:us-east-1:{}:{}".format(ACCOUNT_ID, full_name)
arn = "arn:aws:kms:{}:{}:{}".format(region, ACCOUNT_ID, full_name)
aliases.should.contain({"AliasName": full_name, "AliasArn": arn})