Add KMS Support to EBS Encrypted Volumes (#3383)

* Properly coerce `Encrypted` attribute to bool on request/response.
* Create and use a default AWS managed CMK for EBS when clients request
  an encrypted volume without specifying a KmsKeyId.

NOTE: A client-provided KmsKeyId is simply stored as-is, and is not validated
against the KMS backend. This is in keeping with other moto backends (RDS, Redshift)
that currently also accept unvalidated customer master key (CMK) parameters, but could
be an area for future improvement.

Closes #3248
This commit is contained in:
Brian Pandola 2020-10-14 07:18:50 -07:00 committed by GitHub
parent ea19466c38
commit ccda76898a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 124 additions and 7 deletions

View File

@ -573,3 +573,13 @@ class InvalidLaunchTemplateNameError(EC2ClientError):
"InvalidLaunchTemplateName.AlreadyExistsException",
"Launch template name already in use.",
)
class InvalidParameterDependency(EC2ClientError):
def __init__(self, param, param_needed):
super(InvalidParameterDependency, self).__init__(
"InvalidParameterDependency",
"The parameter [{0}] requires the parameter {1} to be set.".format(
param, param_needed
),
)

View File

@ -28,6 +28,7 @@ from moto.core.utils import (
camelcase_to_underscores,
)
from moto.core import ACCOUNT_ID
from moto.kms import kms_backends
from .exceptions import (
CidrLimitExceeded,
@ -97,6 +98,7 @@ from .exceptions import (
ResourceAlreadyAssociatedError,
RulesPerSecurityGroupLimitExceededError,
TagLimitExceeded,
InvalidParameterDependency,
)
from .utils import (
EC2_RESOURCE_TO_PREFIX,
@ -2425,7 +2427,14 @@ class VolumeAttachment(CloudFormationModel):
class Volume(TaggedEC2Resource, CloudFormationModel):
def __init__(
self, ec2_backend, volume_id, size, zone, snapshot_id=None, encrypted=False
self,
ec2_backend,
volume_id,
size,
zone,
snapshot_id=None,
encrypted=False,
kms_key_id=None,
):
self.id = volume_id
self.size = size
@ -2435,6 +2444,7 @@ class Volume(TaggedEC2Resource, CloudFormationModel):
self.snapshot_id = snapshot_id
self.ec2_backend = ec2_backend
self.encrypted = encrypted
self.kms_key_id = kms_key_id
@staticmethod
def cloudformation_name_type():
@ -2548,7 +2558,13 @@ class EBSBackend(object):
self.snapshots = {}
super(EBSBackend, self).__init__()
def create_volume(self, size, zone_name, snapshot_id=None, encrypted=False):
def create_volume(
self, size, zone_name, snapshot_id=None, encrypted=False, kms_key_id=None
):
if kms_key_id and not encrypted:
raise InvalidParameterDependency("KmsKeyId", "Encrypted")
if encrypted and not kms_key_id:
kms_key_id = self._get_default_encryption_key()
volume_id = random_volume_id()
zone = self.get_zone_by_name(zone_name)
if snapshot_id:
@ -2557,7 +2573,7 @@ class EBSBackend(object):
size = snapshot.volume.size
if snapshot.encrypted:
encrypted = snapshot.encrypted
volume = Volume(self, volume_id, size, zone, snapshot_id, encrypted)
volume = Volume(self, volume_id, size, zone, snapshot_id, encrypted, kms_key_id)
self.volumes[volume_id] = volume
return volume
@ -2705,6 +2721,25 @@ class EBSBackend(object):
return True
def _get_default_encryption_key(self):
# https://aws.amazon.com/kms/features/#AWS_Service_Integration
# An AWS managed CMK is created automatically when you first create
# an encrypted resource using an AWS service integrated with KMS.
kms = kms_backends[self.region_name]
ebs_alias = "alias/aws/ebs"
if not kms.alias_exists(ebs_alias):
key = kms.create_key(
policy="",
key_usage="ENCRYPT_DECRYPT",
customer_master_key_spec="SYMMETRIC_DEFAULT",
description="Default master key that protects my EBS volumes when no other key is defined",
tags=None,
region=self.region_name,
)
kms.add_alias(key.id, ebs_alias)
ebs_key = kms.describe_key(ebs_alias)
return ebs_key.arn
class VPC(TaggedEC2Resource, CloudFormationModel):
def __init__(

View File

@ -46,9 +46,12 @@ class ElasticBlockStore(BaseResponse):
snapshot_id = self._get_param("SnapshotId")
tags = self._parse_tag_specification("TagSpecification")
volume_tags = tags.get("volume", {})
encrypted = self._get_param("Encrypted", if_none=False)
encrypted = self._get_bool_param("Encrypted", if_none=False)
kms_key_id = self._get_param("KmsKeyId")
if self.is_not_dryrun("CreateVolume"):
volume = self.ec2_backend.create_volume(size, zone, snapshot_id, encrypted)
volume = self.ec2_backend.create_volume(
size, zone, snapshot_id, encrypted, kms_key_id
)
volume.add_tags(volume_tags)
template = self.response_template(CREATE_VOLUME_RESPONSE)
return template.render(volume=volume)
@ -161,7 +164,10 @@ CREATE_VOLUME_RESPONSE = """<CreateVolumeResponse xmlns="http://ec2.amazonaws.co
{% else %}
<snapshotId/>
{% endif %}
<encrypted>{{ volume.encrypted }}</encrypted>
<encrypted>{{ 'true' if volume.encrypted else 'false' }}</encrypted>
{% if volume.encrypted %}
<kmsKeyId>{{ volume.kms_key_id }}</kmsKeyId>
{% endif %}
<availabilityZone>{{ volume.zone.name }}</availabilityZone>
<status>creating</status>
<createTime>{{ volume.create_time}}</createTime>
@ -192,7 +198,10 @@ DESCRIBE_VOLUMES_RESPONSE = """<DescribeVolumesResponse xmlns="http://ec2.amazon
{% else %}
<snapshotId/>
{% endif %}
<encrypted>{{ volume.encrypted }}</encrypted>
<encrypted>{{ 'true' if volume.encrypted else 'false' }}</encrypted>
{% if volume.encrypted %}
<kmsKeyId>{{ volume.kms_key_id }}</kmsKeyId>
{% endif %}
<availabilityZone>{{ volume.zone.name }}</availabilityZone>
<status>{{ volume.status }}</status>
<createTime>{{ volume.create_time}}</createTime>

View File

@ -13,6 +13,7 @@ import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2
from moto.ec2.models import OWNER_ID
from moto.kms import mock_kms
@mock_ec2_deprecated
@ -915,3 +916,65 @@ def test_search_for_many_snapshots():
snapshots_response = ec2_client.describe_snapshots(SnapshotIds=snapshot_ids)
assert len(snapshots_response["Snapshots"]) == len(snapshot_ids)
@mock_ec2
def test_create_unencrypted_volume_with_kms_key_fails():
resource = boto3.resource("ec2", region_name="us-east-1")
with assert_raises(ClientError) as ex:
resource.create_volume(
AvailabilityZone="us-east-1a", Encrypted=False, KmsKeyId="key", Size=10
)
ex.exception.response["Error"]["Code"].should.equal("InvalidParameterDependency")
ex.exception.response["Error"]["Message"].should.contain("KmsKeyId")
@mock_kms
@mock_ec2
def test_create_encrypted_volume_without_kms_key_should_use_default_key():
kms = boto3.client("kms", region_name="us-east-1")
# Default master key for EBS does not exist until needed.
with assert_raises(ClientError) as ex:
kms.describe_key(KeyId="alias/aws/ebs")
ex.exception.response["Error"]["Code"].should.equal("NotFoundException")
# Creating an encrypted volume should create (and use) the default key.
resource = boto3.resource("ec2", region_name="us-east-1")
volume = resource.create_volume(
AvailabilityZone="us-east-1a", Encrypted=True, Size=10
)
default_ebs_key_arn = kms.describe_key(KeyId="alias/aws/ebs")["KeyMetadata"]["Arn"]
volume.kms_key_id.should.equal(default_ebs_key_arn)
volume.encrypted.should.be.true
# Subsequent encrypted volumes should use the now-created default key.
volume = resource.create_volume(
AvailabilityZone="us-east-1a", Encrypted=True, Size=10
)
volume.kms_key_id.should.equal(default_ebs_key_arn)
volume.encrypted.should.be.true
@mock_ec2
def test_create_volume_with_kms_key():
resource = boto3.resource("ec2", region_name="us-east-1")
volume = resource.create_volume(
AvailabilityZone="us-east-1a", Encrypted=True, KmsKeyId="key", Size=10
)
volume.kms_key_id.should.equal("key")
volume.encrypted.should.be.true
@mock_ec2
def test_kms_key_id_property_hidden_when_volume_not_encrypted():
client = boto3.client("ec2", region_name="us-east-1")
resp = client.create_volume(AvailabilityZone="us-east-1a", Encrypted=False, Size=10)
resp["Encrypted"].should.be.false
resp.should_not.have.key("KmsKeyId")
resp = client.describe_volumes(VolumeIds=[resp["VolumeId"]])
resp["Volumes"][0]["Encrypted"].should.be.false
resp["Volumes"][0].should_not.have.key("KmsKeyId")
resource = boto3.resource("ec2", region_name="us-east-1")
volume = resource.create_volume(
AvailabilityZone="us-east-1a", Encrypted=False, Size=10
)
volume.encrypted.should.be.false
volume.kms_key_id.should.be.none