Add test for case where ebs volume has no tags.
This commit adds a test for a case where an EBS volume has no tags. When an EBS volume has no tags, calls to the aws ec2 endpoints `create_volume` and `describe_volumes` do not include the `Tags` key in the `response.Volumes[]` object. However, moto does include the `Tags` key in this case. This discrepancy in behaviour can result in code passing a moto test but failing in production. Sample snippets that trigger this condition: ``` def create_volume_and_then_get_tags_from_response(): client = boto3.client('ec2', region_name='us-east-1') volume_response = client.create_volume( Size=10, AvailabilityZone='us-east-1a' ) keys = volume_response['Keys'] ``` ``` def create_volume_and_then_get_tags_from_describe_volumes(): client = boto3.client('ec2', region_name='us-east-1') volume_response = client.create_volume( Size=10, AvailabilityZone='us-east-1a' ) volume_describe_response = client.describe_volumes() keys = volume_describe_response['Volumes'][0]['Keys'] ``` Both sample snippets will succeed in a moto test, but fail with a `KeyError` when using the aws api.
This commit is contained in:
parent
7eaf6bf595
commit
15b3ede3cc
@ -589,6 +589,18 @@ def test_volume_tag_escaping():
|
|||||||
dict(snaps[0].tags).should.equal({'key': '</closed>'})
|
dict(snaps[0].tags).should.equal({'key': '</closed>'})
|
||||||
|
|
||||||
|
|
||||||
|
@mock_ec2
|
||||||
|
def test_volume_property_hidden_when_no_tags_exist():
|
||||||
|
ec2_client = boto3.client('ec2', region_name='us-east-1')
|
||||||
|
|
||||||
|
volume_response = ec2_client.create_volume(
|
||||||
|
Size=10,
|
||||||
|
AvailabilityZone='us-east-1a'
|
||||||
|
)
|
||||||
|
|
||||||
|
volume_response.get('Tags').should.equal(None)
|
||||||
|
|
||||||
|
|
||||||
@freeze_time
|
@freeze_time
|
||||||
@mock_ec2
|
@mock_ec2
|
||||||
def test_copy_snapshot():
|
def test_copy_snapshot():
|
||||||
@ -602,26 +614,26 @@ def test_copy_snapshot():
|
|||||||
create_snapshot_response = ec2_client.create_snapshot(
|
create_snapshot_response = ec2_client.create_snapshot(
|
||||||
VolumeId=volume_response['VolumeId']
|
VolumeId=volume_response['VolumeId']
|
||||||
)
|
)
|
||||||
|
|
||||||
copy_snapshot_response = dest_ec2_client.copy_snapshot(
|
copy_snapshot_response = dest_ec2_client.copy_snapshot(
|
||||||
SourceSnapshotId=create_snapshot_response['SnapshotId'],
|
SourceSnapshotId=create_snapshot_response['SnapshotId'],
|
||||||
SourceRegion="eu-west-1"
|
SourceRegion="eu-west-1"
|
||||||
)
|
)
|
||||||
|
|
||||||
ec2 = boto3.resource('ec2', region_name='eu-west-1')
|
ec2 = boto3.resource('ec2', region_name='eu-west-1')
|
||||||
dest_ec2 = boto3.resource('ec2', region_name='eu-west-2')
|
dest_ec2 = boto3.resource('ec2', region_name='eu-west-2')
|
||||||
|
|
||||||
source = ec2.Snapshot(create_snapshot_response['SnapshotId'])
|
source = ec2.Snapshot(create_snapshot_response['SnapshotId'])
|
||||||
dest = dest_ec2.Snapshot(copy_snapshot_response['SnapshotId'])
|
dest = dest_ec2.Snapshot(copy_snapshot_response['SnapshotId'])
|
||||||
|
|
||||||
attribs = ['data_encryption_key_id', 'encrypted',
|
attribs = ['data_encryption_key_id', 'encrypted',
|
||||||
'kms_key_id', 'owner_alias', 'owner_id',
|
'kms_key_id', 'owner_alias', 'owner_id',
|
||||||
'progress', 'state', 'state_message',
|
'progress', 'state', 'state_message',
|
||||||
'tags', 'volume_id', 'volume_size']
|
'tags', 'volume_id', 'volume_size']
|
||||||
|
|
||||||
for attrib in attribs:
|
for attrib in attribs:
|
||||||
getattr(source, attrib).should.equal(getattr(dest, attrib))
|
getattr(source, attrib).should.equal(getattr(dest, attrib))
|
||||||
|
|
||||||
# Copy from non-existent source ID.
|
# Copy from non-existent source ID.
|
||||||
with assert_raises(ClientError) as cm:
|
with assert_raises(ClientError) as cm:
|
||||||
create_snapshot_error = ec2_client.create_snapshot(
|
create_snapshot_error = ec2_client.create_snapshot(
|
||||||
|
Loading…
Reference in New Issue
Block a user