moto/tests/test_ec2/test_instances.py
tony-dot-sh 9feabf5479
Enhancement: implement EC2 instance filtering by subnet-id (#3694)
Co-authored-by: Tony Greising-Murschel <tony@platform.sh>
2021-02-15 16:38:40 +00:00

1702 lines
61 KiB
Python

from __future__ import unicode_literals
from botocore.exceptions import ClientError
import pytest
from unittest import SkipTest
import base64
import ipaddress
import six
import boto
import boto3
from boto.ec2.instance import Reservation, InstanceAttribute
from boto.exception import EC2ResponseError
from freezegun import freeze_time
import sure # noqa
from moto import mock_ec2_deprecated, mock_ec2, settings
from tests import EXAMPLE_AMI_ID
from tests.helpers import requires_boto_gte
if six.PY2:
decode_method = base64.decodestring
else:
decode_method = base64.decodebytes
################ Test Readme ###############
def add_servers(ami_id, count):
conn = boto.connect_ec2()
for index in range(count):
conn.run_instances(ami_id)
@mock_ec2_deprecated
def test_add_servers():
add_servers(EXAMPLE_AMI_ID, 2)
conn = boto.connect_ec2()
reservations = conn.get_all_reservations()
assert len(reservations) == 2
instance1 = reservations[0].instances[0]
assert instance1.image_id == EXAMPLE_AMI_ID
############################################
@freeze_time("2014-01-01 05:00:00")
@mock_ec2_deprecated
def test_instance_launch_and_terminate():
conn = boto.ec2.connect_to_region("us-east-1")
with pytest.raises(EC2ResponseError) as ex:
reservation = conn.run_instances(EXAMPLE_AMI_ID, dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the RunInstance operation: Request would have succeeded, but DryRun flag is set"
)
reservation = conn.run_instances(EXAMPLE_AMI_ID)
reservation.should.be.a(Reservation)
reservation.instances.should.have.length_of(1)
instance = reservation.instances[0]
instance.state.should.equal("pending")
reservations = conn.get_all_reservations()
reservations.should.have.length_of(1)
reservations[0].id.should.equal(reservation.id)
instances = reservations[0].instances
instances.should.have.length_of(1)
instance = instances[0]
instance.id.should.equal(instance.id)
instance.state.should.equal("running")
instance.launch_time.should.equal("2014-01-01T05:00:00.000Z")
instance.vpc_id.shouldnt.equal(None)
instance.placement.should.equal("us-east-1a")
root_device_name = instance.root_device_name
instance.block_device_mapping[root_device_name].status.should.equal("in-use")
volume_id = instance.block_device_mapping[root_device_name].volume_id
volume_id.should.match(r"vol-\w+")
volume = conn.get_all_volumes(volume_ids=[volume_id])[0]
volume.attach_data.instance_id.should.equal(instance.id)
volume.status.should.equal("in-use")
with pytest.raises(EC2ResponseError) as ex:
conn.terminate_instances([instance.id], dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the TerminateInstance operation: Request would have succeeded, but DryRun flag is set"
)
conn.terminate_instances([instance.id])
reservations = conn.get_all_reservations()
instance = reservations[0].instances[0]
instance.state.should.equal("terminated")
@mock_ec2
def test_instance_terminate_discard_volumes():
ec2_resource = boto3.resource("ec2", "us-west-1")
result = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
BlockDeviceMappings=[
{
"DeviceName": "/dev/sda1",
"Ebs": {"VolumeSize": 50, "DeleteOnTermination": True},
}
],
)
instance = result[0]
instance_volume_ids = []
for volume in instance.volumes.all():
instance_volume_ids.append(volume.volume_id)
instance.terminate()
instance.wait_until_terminated()
assert not list(ec2_resource.volumes.all())
@mock_ec2
def test_instance_terminate_keep_volumes_explicit():
ec2_resource = boto3.resource("ec2", "us-west-1")
result = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
BlockDeviceMappings=[
{
"DeviceName": "/dev/sda1",
"Ebs": {"VolumeSize": 50, "DeleteOnTermination": False},
}
],
)
instance = result[0]
instance_volume_ids = []
for volume in instance.volumes.all():
instance_volume_ids.append(volume.volume_id)
instance.terminate()
instance.wait_until_terminated()
assert len(list(ec2_resource.volumes.all())) == 1
@mock_ec2
def test_instance_terminate_keep_volumes_implicit():
ec2_resource = boto3.resource("ec2", "us-west-1")
result = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
BlockDeviceMappings=[{"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": 50}}],
)
instance = result[0]
instance_volume_ids = []
for volume in instance.volumes.all():
instance_volume_ids.append(volume.volume_id)
instance.terminate()
instance.wait_until_terminated()
assert len(instance_volume_ids) == 1
volume = ec2_resource.Volume(instance_volume_ids[0])
volume.state.should.equal("available")
@mock_ec2
def test_instance_terminate_detach_volumes():
ec2_resource = boto3.resource("ec2", "us-west-1")
result = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
BlockDeviceMappings=[
{"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": 50}},
{"DeviceName": "/dev/sda2", "Ebs": {"VolumeSize": 50}},
],
)
instance = result[0]
for volume in instance.volumes.all():
response = instance.detach_volume(VolumeId=volume.volume_id)
response["State"].should.equal("detaching")
instance.terminate()
instance.wait_until_terminated()
assert len(list(ec2_resource.volumes.all())) == 2
@mock_ec2
def test_instance_detach_volume_wrong_path():
ec2_resource = boto3.resource("ec2", "us-west-1")
result = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
BlockDeviceMappings=[{"DeviceName": "/dev/sda1", "Ebs": {"VolumeSize": 50}},],
)
instance = result[0]
for volume in instance.volumes.all():
with pytest.raises(ClientError) as ex:
instance.detach_volume(VolumeId=volume.volume_id, Device="/dev/sdf")
ex.value.response["Error"]["Code"].should.equal("InvalidAttachment.NotFound")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.value.response["Error"]["Message"].should.equal(
"The volume {0} is not attached to instance {1} as device {2}".format(
volume.volume_id, instance.instance_id, "/dev/sdf"
)
)
@mock_ec2_deprecated
def test_terminate_empty_instances():
conn = boto.connect_ec2("the_key", "the_secret")
conn.terminate_instances.when.called_with([]).should.throw(EC2ResponseError)
@freeze_time("2014-01-01 05:00:00")
@mock_ec2_deprecated
def test_instance_attach_volume():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID)
instance = reservation.instances[0]
vol1 = conn.create_volume(size=36, zone=conn.region.name)
vol1.attach(instance.id, "/dev/sda1")
vol1.update()
vol2 = conn.create_volume(size=65, zone=conn.region.name)
vol2.attach(instance.id, "/dev/sdb1")
vol2.update()
vol3 = conn.create_volume(size=130, zone=conn.region.name)
vol3.attach(instance.id, "/dev/sdc1")
vol3.update()
reservations = conn.get_all_reservations()
instance = reservations[0].instances[0]
instance.block_device_mapping.should.have.length_of(3)
for v in conn.get_all_volumes(
volume_ids=[instance.block_device_mapping["/dev/sdc1"].volume_id]
):
v.attach_data.instance_id.should.equal(instance.id)
# can do due to freeze_time decorator.
v.attach_data.attach_time.should.equal(instance.launch_time)
# can do due to freeze_time decorator.
v.create_time.should.equal(instance.launch_time)
v.region.name.should.equal(instance.region.name)
v.status.should.equal("in-use")
@mock_ec2_deprecated
def test_get_instances_by_id():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=2)
instance1, instance2 = reservation.instances
reservations = conn.get_all_reservations(instance_ids=[instance1.id])
reservations.should.have.length_of(1)
reservation = reservations[0]
reservation.instances.should.have.length_of(1)
reservation.instances[0].id.should.equal(instance1.id)
reservations = conn.get_all_reservations(instance_ids=[instance1.id, instance2.id])
reservations.should.have.length_of(1)
reservation = reservations[0]
reservation.instances.should.have.length_of(2)
instance_ids = [instance.id for instance in reservation.instances]
instance_ids.should.equal([instance1.id, instance2.id])
# Call get_all_reservations with a bad id should raise an error
with pytest.raises(EC2ResponseError) as cm:
conn.get_all_reservations(instance_ids=[instance1.id, "i-1234abcd"])
cm.value.code.should.equal("InvalidInstanceID.NotFound")
cm.value.status.should.equal(400)
cm.value.request_id.should_not.be.none
@mock_ec2
def test_get_paginated_instances():
client = boto3.client("ec2", region_name="us-east-1")
conn = boto3.resource("ec2", "us-east-1")
for i in range(100):
conn.create_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
resp = client.describe_instances(MaxResults=50)
reservations = resp["Reservations"]
reservations.should.have.length_of(50)
next_token = resp["NextToken"]
next_token.should_not.be.none
resp2 = client.describe_instances(NextToken=next_token)
reservations.extend(resp2["Reservations"])
reservations.should.have.length_of(100)
assert "NextToken" not in resp2.keys()
@mock_ec2
def test_create_with_tags():
ec2 = boto3.client("ec2", region_name="us-west-2")
instances = ec2.run_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
InstanceType="t2.micro",
TagSpecifications=[
{
"ResourceType": "instance",
"Tags": [
{"Key": "MY_TAG1", "Value": "MY_VALUE1"},
{"Key": "MY_TAG2", "Value": "MY_VALUE2"},
],
},
{
"ResourceType": "instance",
"Tags": [{"Key": "MY_TAG3", "Value": "MY_VALUE3"}],
},
],
)
assert "Tags" in instances["Instances"][0]
len(instances["Instances"][0]["Tags"]).should.equal(3)
@mock_ec2_deprecated
def test_get_instances_filtering_by_state():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=3)
instance1, instance2, instance3 = reservation.instances
conn.terminate_instances([instance1.id])
reservations = conn.get_all_reservations(filters={"instance-state-name": "running"})
reservations.should.have.length_of(1)
# Since we terminated instance1, only instance2 and instance3 should be
# returned
instance_ids = [instance.id for instance in reservations[0].instances]
set(instance_ids).should.equal(set([instance2.id, instance3.id]))
reservations = conn.get_all_reservations(
[instance2.id], filters={"instance-state-name": "running"}
)
reservations.should.have.length_of(1)
instance_ids = [instance.id for instance in reservations[0].instances]
instance_ids.should.equal([instance2.id])
reservations = conn.get_all_reservations(
[instance2.id], filters={"instance-state-name": "terminated"}
)
list(reservations).should.equal([])
# get_all_reservations should still return all 3
reservations = conn.get_all_reservations()
reservations[0].instances.should.have.length_of(3)
conn.get_all_reservations.when.called_with(
filters={"not-implemented-filter": "foobar"}
).should.throw(NotImplementedError)
@mock_ec2_deprecated
def test_get_instances_filtering_by_instance_id():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=3)
instance1, instance2, instance3 = reservation.instances
reservations = conn.get_all_reservations(filters={"instance-id": instance1.id})
# get_all_reservations should return just instance1
reservations[0].instances.should.have.length_of(1)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations = conn.get_all_reservations(
filters={"instance-id": [instance1.id, instance2.id]}
)
# get_all_reservations should return two
reservations[0].instances.should.have.length_of(2)
reservations = conn.get_all_reservations(filters={"instance-id": "non-existing-id"})
reservations.should.have.length_of(0)
@mock_ec2_deprecated
def test_get_instances_filtering_by_instance_type():
conn = boto.connect_ec2()
reservation1 = conn.run_instances(EXAMPLE_AMI_ID, instance_type="m1.small")
instance1 = reservation1.instances[0]
reservation2 = conn.run_instances(EXAMPLE_AMI_ID, instance_type="m1.small")
instance2 = reservation2.instances[0]
reservation3 = conn.run_instances(EXAMPLE_AMI_ID, instance_type="t1.micro")
instance3 = reservation3.instances[0]
reservations = conn.get_all_reservations(filters={"instance-type": "m1.small"})
# get_all_reservations should return instance1,2
reservations.should.have.length_of(2)
reservations[0].instances.should.have.length_of(1)
reservations[1].instances.should.have.length_of(1)
instance_ids = [reservations[0].instances[0].id, reservations[1].instances[0].id]
set(instance_ids).should.equal(set([instance1.id, instance2.id]))
reservations = conn.get_all_reservations(filters={"instance-type": "t1.micro"})
# get_all_reservations should return one
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(1)
reservations[0].instances[0].id.should.equal(instance3.id)
reservations = conn.get_all_reservations(
filters={"instance-type": ["t1.micro", "m1.small"]}
)
reservations.should.have.length_of(3)
reservations[0].instances.should.have.length_of(1)
reservations[1].instances.should.have.length_of(1)
reservations[2].instances.should.have.length_of(1)
instance_ids = [
reservations[0].instances[0].id,
reservations[1].instances[0].id,
reservations[2].instances[0].id,
]
set(instance_ids).should.equal(set([instance1.id, instance2.id, instance3.id]))
reservations = conn.get_all_reservations(filters={"instance-type": "bogus"})
# bogus instance-type should return none
reservations.should.have.length_of(0)
@mock_ec2_deprecated
def test_get_instances_filtering_by_reason_code():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=3)
instance1, instance2, instance3 = reservation.instances
instance1.stop()
instance2.terminate()
reservations = conn.get_all_reservations(
filters={"state-reason-code": "Client.UserInitiatedShutdown"}
)
# get_all_reservations should return instance1 and instance2
reservations[0].instances.should.have.length_of(2)
set([instance1.id, instance2.id]).should.equal(
set([i.id for i in reservations[0].instances])
)
reservations = conn.get_all_reservations(filters={"state-reason-code": ""})
# get_all_reservations should return instance 3
reservations[0].instances.should.have.length_of(1)
reservations[0].instances[0].id.should.equal(instance3.id)
@mock_ec2_deprecated
def test_get_instances_filtering_by_source_dest_check():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=2)
instance1, instance2 = reservation.instances
conn.modify_instance_attribute(
instance1.id, attribute="sourceDestCheck", value=False
)
source_dest_check_false = conn.get_all_reservations(
filters={"source-dest-check": "false"}
)
source_dest_check_true = conn.get_all_reservations(
filters={"source-dest-check": "true"}
)
source_dest_check_false[0].instances.should.have.length_of(1)
source_dest_check_false[0].instances[0].id.should.equal(instance1.id)
source_dest_check_true[0].instances.should.have.length_of(1)
source_dest_check_true[0].instances[0].id.should.equal(instance2.id)
@mock_ec2_deprecated
def test_get_instances_filtering_by_vpc_id():
conn = boto.connect_vpc("the_key", "the_secret")
vpc1 = conn.create_vpc("10.0.0.0/16")
subnet1 = conn.create_subnet(vpc1.id, "10.0.0.0/27")
reservation1 = conn.run_instances(EXAMPLE_AMI_ID, min_count=1, subnet_id=subnet1.id)
instance1 = reservation1.instances[0]
vpc2 = conn.create_vpc("10.1.0.0/16")
subnet2 = conn.create_subnet(vpc2.id, "10.1.0.0/27")
reservation2 = conn.run_instances(EXAMPLE_AMI_ID, min_count=1, subnet_id=subnet2.id)
instance2 = reservation2.instances[0]
reservations1 = conn.get_all_reservations(filters={"vpc-id": vpc1.id})
reservations1.should.have.length_of(1)
reservations1[0].instances.should.have.length_of(1)
reservations1[0].instances[0].id.should.equal(instance1.id)
reservations1[0].instances[0].vpc_id.should.equal(vpc1.id)
reservations1[0].instances[0].subnet_id.should.equal(subnet1.id)
reservations2 = conn.get_all_reservations(filters={"vpc-id": vpc2.id})
reservations2.should.have.length_of(1)
reservations2[0].instances.should.have.length_of(1)
reservations2[0].instances[0].id.should.equal(instance2.id)
reservations2[0].instances[0].vpc_id.should.equal(vpc2.id)
reservations2[0].instances[0].subnet_id.should.equal(subnet2.id)
@mock_ec2_deprecated
def test_get_instances_filtering_by_architecture():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=1)
instance = reservation.instances
reservations = conn.get_all_reservations(filters={"architecture": "x86_64"})
# get_all_reservations should return the instance
reservations[0].instances.should.have.length_of(1)
@mock_ec2
def test_get_instances_filtering_by_image_id():
client = boto3.client("ec2", region_name="us-east-1")
conn = boto3.resource("ec2", "us-east-1")
conn.create_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
reservations = client.describe_instances(
Filters=[{"Name": "image-id", "Values": [EXAMPLE_AMI_ID]}]
)["Reservations"]
reservations[0]["Instances"].should.have.length_of(1)
@mock_ec2
def test_get_instances_filtering_by_account_id():
client = boto3.client("ec2", region_name="us-east-1")
conn = boto3.resource("ec2", "us-east-1")
conn.create_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
reservations = client.describe_instances(
Filters=[{"Name": "owner-id", "Values": ["123456789012"]}]
)["Reservations"]
reservations[0]["Instances"].should.have.length_of(1)
@mock_ec2
def test_get_instances_filtering_by_private_dns():
client = boto3.client("ec2", region_name="us-east-1")
conn = boto3.resource("ec2", "us-east-1")
conn.create_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1, PrivateIpAddress="10.0.0.1"
)
reservations = client.describe_instances(
Filters=[{"Name": "private-dns-name", "Values": ["ip-10-0-0-1.ec2.internal"]}]
)["Reservations"]
reservations[0]["Instances"].should.have.length_of(1)
@mock_ec2
def test_get_instances_filtering_by_ni_private_dns():
client = boto3.client("ec2", region_name="us-west-2")
conn = boto3.resource("ec2", "us-west-2")
conn.create_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1, PrivateIpAddress="10.0.0.1"
)
reservations = client.describe_instances(
Filters=[
{
"Name": "network-interface.private-dns-name",
"Values": ["ip-10-0-0-1.us-west-2.compute.internal"],
}
]
)["Reservations"]
reservations[0]["Instances"].should.have.length_of(1)
@mock_ec2
def test_get_instances_filtering_by_instance_group_name():
client = boto3.client("ec2", region_name="us-east-1")
client.create_security_group(Description="test", GroupName="test_sg")
client.run_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1, SecurityGroups=["test_sg"]
)
reservations = client.describe_instances(
Filters=[{"Name": "instance.group-name", "Values": ["test_sg"]}]
)["Reservations"]
reservations[0]["Instances"].should.have.length_of(1)
@mock_ec2
def test_get_instances_filtering_by_instance_group_id():
client = boto3.client("ec2", region_name="us-east-1")
create_sg = client.create_security_group(Description="test", GroupName="test_sg")
group_id = create_sg["GroupId"]
client.run_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1, SecurityGroups=["test_sg"]
)
reservations = client.describe_instances(
Filters=[{"Name": "instance.group-id", "Values": [group_id]}]
)["Reservations"]
reservations[0]["Instances"].should.have.length_of(1)
@mock_ec2
def test_get_instances_filtering_by_subnet_id():
client = boto3.client("ec2", region_name="us-east-1")
vpc_cidr = ipaddress.ip_network("192.168.42.0/24")
subnet_cidr = ipaddress.ip_network("192.168.42.0/25")
resp = client.create_vpc(CidrBlock=str(vpc_cidr),)
vpc_id = resp["Vpc"]["VpcId"]
resp = client.create_subnet(CidrBlock=str(subnet_cidr), VpcId=vpc_id)
subnet_id = resp["Subnet"]["SubnetId"]
client.run_instances(
ImageId=EXAMPLE_AMI_ID, MaxCount=1, MinCount=1, SubnetId=subnet_id,
)
reservations = client.describe_instances(
Filters=[{"Name": "subnet-id", "Values": [subnet_id]}]
)["Reservations"]
reservations.should.have.length_of(1)
@mock_ec2_deprecated
def test_get_instances_filtering_by_tag():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=3)
instance1, instance2, instance3 = reservation.instances
instance1.add_tag("tag1", "value1")
instance1.add_tag("tag2", "value2")
instance2.add_tag("tag1", "value1")
instance2.add_tag("tag2", "wrong value")
instance3.add_tag("tag2", "value2")
reservations = conn.get_all_reservations(filters={"tag:tag0": "value0"})
# get_all_reservations should return no instances
reservations.should.have.length_of(0)
reservations = conn.get_all_reservations(filters={"tag:tag1": "value1"})
# get_all_reservations should return both instances with this tag value
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations = conn.get_all_reservations(
filters={"tag:tag1": "value1", "tag:tag2": "value2"}
)
# get_all_reservations should return the instance with both tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(1)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations = conn.get_all_reservations(
filters={"tag:tag1": "value1", "tag:tag2": "value2"}
)
# get_all_reservations should return the instance with both tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(1)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations = conn.get_all_reservations(filters={"tag:tag2": ["value2", "bogus"]})
# get_all_reservations should return both instances with one of the
# acceptable tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance3.id)
@mock_ec2_deprecated
def test_get_instances_filtering_by_tag_value():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=3)
instance1, instance2, instance3 = reservation.instances
instance1.add_tag("tag1", "value1")
instance1.add_tag("tag2", "value2")
instance2.add_tag("tag1", "value1")
instance2.add_tag("tag2", "wrong value")
instance3.add_tag("tag2", "value2")
reservations = conn.get_all_reservations(filters={"tag-value": "value0"})
# get_all_reservations should return no instances
reservations.should.have.length_of(0)
reservations = conn.get_all_reservations(filters={"tag-value": "value1"})
# get_all_reservations should return both instances with this tag value
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations = conn.get_all_reservations(
filters={"tag-value": ["value2", "value1"]}
)
# get_all_reservations should return both instances with one of the
# acceptable tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(3)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations[0].instances[2].id.should.equal(instance3.id)
reservations = conn.get_all_reservations(filters={"tag-value": ["value2", "bogus"]})
# get_all_reservations should return both instances with one of the
# acceptable tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance3.id)
@mock_ec2_deprecated
def test_get_instances_filtering_by_tag_name():
conn = boto.connect_ec2()
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=3)
instance1, instance2, instance3 = reservation.instances
instance1.add_tag("tag1")
instance1.add_tag("tag2")
instance2.add_tag("tag1")
instance2.add_tag("tag2X")
instance3.add_tag("tag3")
reservations = conn.get_all_reservations(filters={"tag-key": "tagX"})
# get_all_reservations should return no instances
reservations.should.have.length_of(0)
reservations = conn.get_all_reservations(filters={"tag-key": "tag1"})
# get_all_reservations should return both instances with this tag value
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(2)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations = conn.get_all_reservations(filters={"tag-key": ["tag1", "tag3"]})
# get_all_reservations should return both instances with one of the
# acceptable tag values
reservations.should.have.length_of(1)
reservations[0].instances.should.have.length_of(3)
reservations[0].instances[0].id.should.equal(instance1.id)
reservations[0].instances[1].id.should.equal(instance2.id)
reservations[0].instances[2].id.should.equal(instance3.id)
@mock_ec2_deprecated
def test_instance_start_and_stop():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=2)
instances = reservation.instances
instances.should.have.length_of(2)
instance_ids = [instance.id for instance in instances]
with pytest.raises(EC2ResponseError) as ex:
stopped_instances = conn.stop_instances(instance_ids, dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the StopInstance operation: Request would have succeeded, but DryRun flag is set"
)
stopped_instances = conn.stop_instances(instance_ids)
for instance in stopped_instances:
instance.state.should.equal("stopping")
with pytest.raises(EC2ResponseError) as ex:
started_instances = conn.start_instances([instances[0].id], dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the StartInstance operation: Request would have succeeded, but DryRun flag is set"
)
started_instances = conn.start_instances([instances[0].id])
started_instances[0].state.should.equal("pending")
@mock_ec2_deprecated
def test_instance_reboot():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID)
instance = reservation.instances[0]
with pytest.raises(EC2ResponseError) as ex:
instance.reboot(dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the RebootInstance operation: Request would have succeeded, but DryRun flag is set"
)
instance.reboot()
instance.state.should.equal("pending")
@mock_ec2_deprecated
def test_instance_attribute_instance_type():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID)
instance = reservation.instances[0]
with pytest.raises(EC2ResponseError) as ex:
instance.modify_attribute("instanceType", "m1.small", dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the ModifyInstanceType operation: Request would have succeeded, but DryRun flag is set"
)
instance.modify_attribute("instanceType", "m1.small")
instance_attribute = instance.get_attribute("instanceType")
instance_attribute.should.be.a(InstanceAttribute)
instance_attribute.get("instanceType").should.equal("m1.small")
@mock_ec2_deprecated
def test_modify_instance_attribute_security_groups():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID)
instance = reservation.instances[0]
sg_id = conn.create_security_group(
"test security group", "this is a test security group"
).id
sg_id2 = conn.create_security_group(
"test security group 2", "this is a test security group 2"
).id
with pytest.raises(EC2ResponseError) as ex:
instance.modify_attribute("groupSet", [sg_id, sg_id2], dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the ModifyInstanceSecurityGroups operation: Request would have succeeded, but DryRun flag is set"
)
instance.modify_attribute("groupSet", [sg_id, sg_id2])
instance_attribute = instance.get_attribute("groupSet")
instance_attribute.should.be.a(InstanceAttribute)
group_list = instance_attribute.get("groupSet")
any(g.id == sg_id for g in group_list).should.be.ok
any(g.id == sg_id2 for g in group_list).should.be.ok
@mock_ec2_deprecated
def test_instance_attribute_user_data():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID)
instance = reservation.instances[0]
with pytest.raises(EC2ResponseError) as ex:
instance.modify_attribute("userData", "this is my user data", dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the ModifyUserData operation: Request would have succeeded, but DryRun flag is set"
)
instance.modify_attribute("userData", "this is my user data")
instance_attribute = instance.get_attribute("userData")
instance_attribute.should.be.a(InstanceAttribute)
instance_attribute.get("userData").should.equal("this is my user data")
@mock_ec2_deprecated
def test_instance_attribute_source_dest_check():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID)
instance = reservation.instances[0]
# Default value is true
instance.sourceDestCheck.should.equal("true")
instance_attribute = instance.get_attribute("sourceDestCheck")
instance_attribute.should.be.a(InstanceAttribute)
instance_attribute.get("sourceDestCheck").should.equal(True)
# Set to false (note: Boto converts bool to string, eg 'false')
with pytest.raises(EC2ResponseError) as ex:
instance.modify_attribute("sourceDestCheck", False, dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the ModifySourceDestCheck operation: Request would have succeeded, but DryRun flag is set"
)
instance.modify_attribute("sourceDestCheck", False)
instance.update()
instance.sourceDestCheck.should.equal("false")
instance_attribute = instance.get_attribute("sourceDestCheck")
instance_attribute.should.be.a(InstanceAttribute)
instance_attribute.get("sourceDestCheck").should.equal(False)
# Set back to true
instance.modify_attribute("sourceDestCheck", True)
instance.update()
instance.sourceDestCheck.should.equal("true")
instance_attribute = instance.get_attribute("sourceDestCheck")
instance_attribute.should.be.a(InstanceAttribute)
instance_attribute.get("sourceDestCheck").should.equal(True)
@mock_ec2_deprecated
def test_user_data_with_run_instance():
user_data = b"some user data"
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID, user_data=user_data)
instance = reservation.instances[0]
instance_attribute = instance.get_attribute("userData")
instance_attribute.should.be.a(InstanceAttribute)
retrieved_user_data = instance_attribute.get("userData").encode("utf-8")
decoded_user_data = decode_method(retrieved_user_data)
decoded_user_data.should.equal(b"some user data")
@mock_ec2_deprecated
def test_run_instance_with_security_group_name():
conn = boto.connect_ec2("the_key", "the_secret")
with pytest.raises(EC2ResponseError) as ex:
group = conn.create_security_group("group1", "some description", dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the CreateSecurityGroup operation: Request would have succeeded, but DryRun flag is set"
)
group = conn.create_security_group("group1", "some description")
reservation = conn.run_instances(EXAMPLE_AMI_ID, security_groups=["group1"])
instance = reservation.instances[0]
instance.groups[0].id.should.equal(group.id)
instance.groups[0].name.should.equal("group1")
@mock_ec2_deprecated
def test_run_instance_with_security_group_id():
conn = boto.connect_ec2("the_key", "the_secret")
group = conn.create_security_group("group1", "some description")
reservation = conn.run_instances(EXAMPLE_AMI_ID, security_group_ids=[group.id])
instance = reservation.instances[0]
instance.groups[0].id.should.equal(group.id)
instance.groups[0].name.should.equal("group1")
@mock_ec2_deprecated
def test_run_instance_with_instance_type():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID, instance_type="t1.micro")
instance = reservation.instances[0]
instance.instance_type.should.equal("t1.micro")
@mock_ec2_deprecated
def test_run_instance_with_default_placement():
conn = boto.ec2.connect_to_region("us-east-1")
reservation = conn.run_instances(EXAMPLE_AMI_ID)
instance = reservation.instances[0]
instance.placement.should.equal("us-east-1a")
@mock_ec2_deprecated
def test_run_instance_with_placement():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID, placement="us-east-1b")
instance = reservation.instances[0]
instance.placement.should.equal("us-east-1b")
@mock_ec2
def test_run_instance_with_subnet_boto3():
client = boto3.client("ec2", region_name="eu-central-1")
ip_networks = [
(ipaddress.ip_network("10.0.0.0/16"), ipaddress.ip_network("10.0.99.0/24")),
(
ipaddress.ip_network("192.168.42.0/24"),
ipaddress.ip_network("192.168.42.0/25"),
),
]
# Tests instances are created with the correct IPs
for vpc_cidr, subnet_cidr in ip_networks:
resp = client.create_vpc(
CidrBlock=str(vpc_cidr),
AmazonProvidedIpv6CidrBlock=False,
DryRun=False,
InstanceTenancy="default",
)
vpc_id = resp["Vpc"]["VpcId"]
resp = client.create_subnet(CidrBlock=str(subnet_cidr), VpcId=vpc_id)
subnet_id = resp["Subnet"]["SubnetId"]
resp = client.run_instances(
ImageId=EXAMPLE_AMI_ID, MaxCount=1, MinCount=1, SubnetId=subnet_id
)
instance = resp["Instances"][0]
instance["SubnetId"].should.equal(subnet_id)
priv_ipv4 = ipaddress.ip_address(six.text_type(instance["PrivateIpAddress"]))
subnet_cidr.should.contain(priv_ipv4)
@mock_ec2
def test_run_instance_with_specified_private_ipv4():
client = boto3.client("ec2", region_name="eu-central-1")
vpc_cidr = ipaddress.ip_network("192.168.42.0/24")
subnet_cidr = ipaddress.ip_network("192.168.42.0/25")
resp = client.create_vpc(
CidrBlock=str(vpc_cidr),
AmazonProvidedIpv6CidrBlock=False,
DryRun=False,
InstanceTenancy="default",
)
vpc_id = resp["Vpc"]["VpcId"]
resp = client.create_subnet(CidrBlock=str(subnet_cidr), VpcId=vpc_id)
subnet_id = resp["Subnet"]["SubnetId"]
resp = client.run_instances(
ImageId=EXAMPLE_AMI_ID,
MaxCount=1,
MinCount=1,
SubnetId=subnet_id,
PrivateIpAddress="192.168.42.5",
)
instance = resp["Instances"][0]
instance["SubnetId"].should.equal(subnet_id)
instance["PrivateIpAddress"].should.equal("192.168.42.5")
@mock_ec2
def test_run_instance_mapped_public_ipv4():
client = boto3.client("ec2", region_name="eu-central-1")
vpc_cidr = ipaddress.ip_network("192.168.42.0/24")
subnet_cidr = ipaddress.ip_network("192.168.42.0/25")
resp = client.create_vpc(
CidrBlock=str(vpc_cidr),
AmazonProvidedIpv6CidrBlock=False,
DryRun=False,
InstanceTenancy="default",
)
vpc_id = resp["Vpc"]["VpcId"]
resp = client.create_subnet(CidrBlock=str(subnet_cidr), VpcId=vpc_id)
subnet_id = resp["Subnet"]["SubnetId"]
client.modify_subnet_attribute(
SubnetId=subnet_id, MapPublicIpOnLaunch={"Value": True}
)
resp = client.run_instances(
ImageId=EXAMPLE_AMI_ID, MaxCount=1, MinCount=1, SubnetId=subnet_id
)
instance = resp["Instances"][0]
instance.should.contain("PublicDnsName")
instance.should.contain("PublicIpAddress")
len(instance["PublicDnsName"]).should.be.greater_than(0)
len(instance["PublicIpAddress"]).should.be.greater_than(0)
@mock_ec2_deprecated
def test_run_instance_with_nic_autocreated():
conn = boto.connect_vpc("the_key", "the_secret")
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
"test security group #1", "this is a test security group"
)
security_group2 = conn.create_security_group(
"test security group #2", "this is a test security group"
)
private_ip = "10.0.0.1"
reservation = conn.run_instances(
EXAMPLE_AMI_ID,
subnet_id=subnet.id,
security_groups=[security_group1.name],
security_group_ids=[security_group2.id],
private_ip_address=private_ip,
)
instance = reservation.instances[0]
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
eni = all_enis[0]
instance.interfaces.should.have.length_of(1)
instance.interfaces[0].id.should.equal(eni.id)
instance.subnet_id.should.equal(subnet.id)
instance.groups.should.have.length_of(2)
set([group.id for group in instance.groups]).should.equal(
set([security_group1.id, security_group2.id])
)
eni.subnet_id.should.equal(subnet.id)
eni.groups.should.have.length_of(2)
set([group.id for group in eni.groups]).should.equal(
set([security_group1.id, security_group2.id])
)
eni.private_ip_addresses.should.have.length_of(1)
eni.private_ip_addresses[0].private_ip_address.should.equal(private_ip)
@mock_ec2_deprecated
def test_run_instance_with_nic_preexisting():
conn = boto.connect_vpc("the_key", "the_secret")
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
"test security group #1", "this is a test security group"
)
security_group2 = conn.create_security_group(
"test security group #2", "this is a test security group"
)
private_ip = "54.0.0.1"
eni = conn.create_network_interface(
subnet.id, private_ip, groups=[security_group1.id]
)
# Boto requires NetworkInterfaceCollection of NetworkInterfaceSpecifications...
# annoying, but generates the desired querystring.
from boto.ec2.networkinterface import (
NetworkInterfaceSpecification,
NetworkInterfaceCollection,
)
interface = NetworkInterfaceSpecification(
network_interface_id=eni.id, device_index=0
)
interfaces = NetworkInterfaceCollection(interface)
# end Boto objects
reservation = conn.run_instances(
EXAMPLE_AMI_ID,
network_interfaces=interfaces,
security_group_ids=[security_group2.id],
)
instance = reservation.instances[0]
instance.subnet_id.should.equal(subnet.id)
all_enis = conn.get_all_network_interfaces()
all_enis.should.have.length_of(1)
instance.interfaces.should.have.length_of(1)
instance_eni = instance.interfaces[0]
instance_eni.id.should.equal(eni.id)
instance_eni.subnet_id.should.equal(subnet.id)
instance_eni.groups.should.have.length_of(2)
set([group.id for group in instance_eni.groups]).should.equal(
set([security_group1.id, security_group2.id])
)
instance_eni.private_ip_addresses.should.have.length_of(1)
instance_eni.private_ip_addresses[0].private_ip_address.should.equal(private_ip)
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_instance_with_nic_attach_detach():
conn = boto.connect_vpc("the_key", "the_secret")
vpc = conn.create_vpc("10.0.0.0/16")
subnet = conn.create_subnet(vpc.id, "10.0.0.0/18")
security_group1 = conn.create_security_group(
"test security group #1", "this is a test security group"
)
security_group2 = conn.create_security_group(
"test security group #2", "this is a test security group"
)
reservation = conn.run_instances(
EXAMPLE_AMI_ID, security_group_ids=[security_group1.id]
)
instance = reservation.instances[0]
eni = conn.create_network_interface(subnet.id, groups=[security_group2.id])
# Check initial instance and ENI data
instance.interfaces.should.have.length_of(1)
eni.groups.should.have.length_of(1)
set([group.id for group in eni.groups]).should.equal(set([security_group2.id]))
# Attach
with pytest.raises(EC2ResponseError) as ex:
conn.attach_network_interface(eni.id, instance.id, device_index=1, dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the AttachNetworkInterface operation: Request would have succeeded, but DryRun flag is set"
)
conn.attach_network_interface(eni.id, instance.id, device_index=1)
# Check attached instance and ENI data
instance.update()
instance.interfaces.should.have.length_of(2)
instance_eni = instance.interfaces[1]
instance_eni.id.should.equal(eni.id)
instance_eni.groups.should.have.length_of(2)
set([group.id for group in instance_eni.groups]).should.equal(
set([security_group1.id, security_group2.id])
)
eni = conn.get_all_network_interfaces(filters={"network-interface-id": eni.id})[0]
eni.groups.should.have.length_of(2)
set([group.id for group in eni.groups]).should.equal(
set([security_group1.id, security_group2.id])
)
# Detach
with pytest.raises(EC2ResponseError) as ex:
conn.detach_network_interface(instance_eni.attachment.id, dry_run=True)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the DetachNetworkInterface operation: Request would have succeeded, but DryRun flag is set"
)
conn.detach_network_interface(instance_eni.attachment.id)
# Check detached instance and ENI data
instance.update()
instance.interfaces.should.have.length_of(1)
eni = conn.get_all_network_interfaces(filters={"network-interface-id": eni.id})[0]
eni.groups.should.have.length_of(1)
set([group.id for group in eni.groups]).should.equal(set([security_group2.id]))
# Detach with invalid attachment ID
with pytest.raises(EC2ResponseError) as cm:
conn.detach_network_interface("eni-attach-1234abcd")
cm.value.code.should.equal("InvalidAttachmentID.NotFound")
cm.value.status.should.equal(400)
cm.value.request_id.should_not.be.none
@mock_ec2_deprecated
def test_ec2_classic_has_public_ip_address():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID, key_name="keypair_name")
instance = reservation.instances[0]
instance.ip_address.should_not.equal(None)
instance.public_dns_name.should.contain(instance.ip_address.replace(".", "-"))
instance.private_ip_address.should_not.equal(None)
instance.private_dns_name.should.contain(
instance.private_ip_address.replace(".", "-")
)
@mock_ec2_deprecated
def test_run_instance_with_keypair():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID, key_name="keypair_name")
instance = reservation.instances[0]
instance.key_name.should.equal("keypair_name")
@mock_ec2
def test_run_instance_with_block_device_mappings():
ec2_client = boto3.client("ec2", region_name="us-east-1")
kwargs = {
"MinCount": 1,
"MaxCount": 1,
"ImageId": EXAMPLE_AMI_ID,
"KeyName": "the_key",
"InstanceType": "t1.micro",
"BlockDeviceMappings": [{"DeviceName": "/dev/sda2", "Ebs": {"VolumeSize": 50}}],
}
ec2_client.run_instances(**kwargs)
instances = ec2_client.describe_instances()
volume = instances["Reservations"][0]["Instances"][0]["BlockDeviceMappings"][0][
"Ebs"
]
volumes = ec2_client.describe_volumes(VolumeIds=[volume["VolumeId"]])
volumes["Volumes"][0]["Size"].should.equal(50)
@mock_ec2
def test_run_instance_with_block_device_mappings_missing_ebs():
ec2_client = boto3.client("ec2", region_name="us-east-1")
kwargs = {
"MinCount": 1,
"MaxCount": 1,
"ImageId": EXAMPLE_AMI_ID,
"KeyName": "the_key",
"InstanceType": "t1.micro",
"BlockDeviceMappings": [{"DeviceName": "/dev/sda2"}],
}
with pytest.raises(ClientError) as ex:
ec2_client.run_instances(**kwargs)
ex.value.response["Error"]["Code"].should.equal("MissingParameter")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.value.response["Error"]["Message"].should.equal(
"The request must contain the parameter ebs"
)
@mock_ec2
def test_run_instance_with_block_device_mappings_missing_size():
ec2_client = boto3.client("ec2", region_name="us-east-1")
kwargs = {
"MinCount": 1,
"MaxCount": 1,
"ImageId": EXAMPLE_AMI_ID,
"KeyName": "the_key",
"InstanceType": "t1.micro",
"BlockDeviceMappings": [
{"DeviceName": "/dev/sda2", "Ebs": {"VolumeType": "standard"}}
],
}
with pytest.raises(ClientError) as ex:
ec2_client.run_instances(**kwargs)
ex.value.response["Error"]["Code"].should.equal("MissingParameter")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
ex.value.response["Error"]["Message"].should.equal(
"The request must contain the parameter size or snapshotId"
)
@mock_ec2
def test_run_instance_with_block_device_mappings_from_snapshot():
ec2_client = boto3.client("ec2", region_name="us-east-1")
ec2_resource = boto3.resource("ec2", region_name="us-east-1")
volume_details = {
"AvailabilityZone": "1a",
"Size": 30,
}
volume = ec2_resource.create_volume(**volume_details)
snapshot = volume.create_snapshot()
kwargs = {
"MinCount": 1,
"MaxCount": 1,
"ImageId": EXAMPLE_AMI_ID,
"KeyName": "the_key",
"InstanceType": "t1.micro",
"BlockDeviceMappings": [
{"DeviceName": "/dev/sda2", "Ebs": {"SnapshotId": snapshot.snapshot_id}}
],
}
ec2_client.run_instances(**kwargs)
instances = ec2_client.describe_instances()
volume = instances["Reservations"][0]["Instances"][0]["BlockDeviceMappings"][0][
"Ebs"
]
volumes = ec2_client.describe_volumes(VolumeIds=[volume["VolumeId"]])
volumes["Volumes"][0]["Size"].should.equal(30)
volumes["Volumes"][0]["SnapshotId"].should.equal(snapshot.snapshot_id)
@mock_ec2_deprecated
def test_describe_instance_status_no_instances():
conn = boto.connect_ec2("the_key", "the_secret")
all_status = conn.get_all_instance_status()
len(all_status).should.equal(0)
@mock_ec2_deprecated
def test_describe_instance_status_with_instances():
conn = boto.connect_ec2("the_key", "the_secret")
conn.run_instances(EXAMPLE_AMI_ID, key_name="keypair_name")
all_status = conn.get_all_instance_status()
len(all_status).should.equal(1)
all_status[0].instance_status.status.should.equal("ok")
all_status[0].system_status.status.should.equal("ok")
@mock_ec2_deprecated
def test_describe_instance_status_with_instance_filter_deprecated():
conn = boto.connect_ec2("the_key", "the_secret")
# We want to filter based on this one
reservation = conn.run_instances(EXAMPLE_AMI_ID, key_name="keypair_name")
instance = reservation.instances[0]
# This is just to setup the test
conn.run_instances(EXAMPLE_AMI_ID, key_name="keypair_name")
all_status = conn.get_all_instance_status(instance_ids=[instance.id])
len(all_status).should.equal(1)
all_status[0].id.should.equal(instance.id)
# Call get_all_instance_status with a bad id should raise an error
with pytest.raises(EC2ResponseError) as cm:
conn.get_all_instance_status(instance_ids=[instance.id, "i-1234abcd"])
cm.value.code.should.equal("InvalidInstanceID.NotFound")
cm.value.status.should.equal(400)
cm.value.request_id.should_not.be.none
@mock_ec2
def test_describe_instance_credit_specifications():
conn = boto3.client("ec2", region_name="us-west-1")
# We want to filter based on this one
reservation = conn.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
result = conn.describe_instance_credit_specifications(
InstanceIds=[reservation["Instances"][0]["InstanceId"]]
)
assert (
result["InstanceCreditSpecifications"][0]["InstanceId"]
== reservation["Instances"][0]["InstanceId"]
)
@mock_ec2
def test_describe_instance_status_with_instance_filter():
conn = boto3.client("ec2", region_name="us-west-1")
# We want to filter based on this one
reservation = conn.run_instances(ImageId=EXAMPLE_AMI_ID, MinCount=3, MaxCount=3)
instance1 = reservation["Instances"][0]
instance2 = reservation["Instances"][1]
instance3 = reservation["Instances"][2]
conn.stop_instances(InstanceIds=[instance1["InstanceId"]])
stopped_instance_ids = [instance1["InstanceId"]]
running_instance_ids = sorted([instance2["InstanceId"], instance3["InstanceId"]])
all_instance_ids = sorted(stopped_instance_ids + running_instance_ids)
# Filter instance using the state name
state_name_filter = {
"running_and_stopped": [
{"Name": "instance-state-name", "Values": ["running", "stopped"]}
],
"running": [{"Name": "instance-state-name", "Values": ["running"]}],
"stopped": [{"Name": "instance-state-name", "Values": ["stopped"]}],
}
found_statuses = conn.describe_instance_status(
IncludeAllInstances=True, Filters=state_name_filter["running_and_stopped"]
)["InstanceStatuses"]
found_instance_ids = [status["InstanceId"] for status in found_statuses]
sorted(found_instance_ids).should.equal(all_instance_ids)
found_statuses = conn.describe_instance_status(
IncludeAllInstances=True, Filters=state_name_filter["running"]
)["InstanceStatuses"]
found_instance_ids = [status["InstanceId"] for status in found_statuses]
sorted(found_instance_ids).should.equal(running_instance_ids)
found_statuses = conn.describe_instance_status(
IncludeAllInstances=True, Filters=state_name_filter["stopped"]
)["InstanceStatuses"]
found_instance_ids = [status["InstanceId"] for status in found_statuses]
sorted(found_instance_ids).should.equal(stopped_instance_ids)
# Filter instance using the state code
state_code_filter = {
"running_and_stopped": [
{"Name": "instance-state-code", "Values": ["16", "80"]}
],
"running": [{"Name": "instance-state-code", "Values": ["16"]}],
"stopped": [{"Name": "instance-state-code", "Values": ["80"]}],
}
found_statuses = conn.describe_instance_status(
IncludeAllInstances=True, Filters=state_code_filter["running_and_stopped"]
)["InstanceStatuses"]
found_instance_ids = [status["InstanceId"] for status in found_statuses]
sorted(found_instance_ids).should.equal(all_instance_ids)
found_statuses = conn.describe_instance_status(
IncludeAllInstances=True, Filters=state_code_filter["running"]
)["InstanceStatuses"]
found_instance_ids = [status["InstanceId"] for status in found_statuses]
sorted(found_instance_ids).should.equal(running_instance_ids)
found_statuses = conn.describe_instance_status(
IncludeAllInstances=True, Filters=state_code_filter["stopped"]
)["InstanceStatuses"]
found_instance_ids = [status["InstanceId"] for status in found_statuses]
sorted(found_instance_ids).should.equal(stopped_instance_ids)
@requires_boto_gte("2.32.0")
@mock_ec2_deprecated
def test_describe_instance_status_with_non_running_instances():
conn = boto.connect_ec2("the_key", "the_secret")
reservation = conn.run_instances(EXAMPLE_AMI_ID, min_count=3)
instance1, instance2, instance3 = reservation.instances
instance1.stop()
instance2.terminate()
all_running_status = conn.get_all_instance_status()
all_running_status.should.have.length_of(1)
all_running_status[0].id.should.equal(instance3.id)
all_running_status[0].state_name.should.equal("running")
all_status = conn.get_all_instance_status(include_all_instances=True)
all_status.should.have.length_of(3)
status1 = next((s for s in all_status if s.id == instance1.id), None)
status1.state_name.should.equal("stopped")
status2 = next((s for s in all_status if s.id == instance2.id), None)
status2.state_name.should.equal("terminated")
status3 = next((s for s in all_status if s.id == instance3.id), None)
status3.state_name.should.equal("running")
@mock_ec2_deprecated
def test_get_instance_by_security_group():
conn = boto.connect_ec2("the_key", "the_secret")
conn.run_instances(EXAMPLE_AMI_ID)
instance = conn.get_only_instances()[0]
security_group = conn.create_security_group("test", "test")
with pytest.raises(EC2ResponseError) as ex:
conn.modify_instance_attribute(
instance.id, "groupSet", [security_group.id], dry_run=True
)
ex.value.error_code.should.equal("DryRunOperation")
ex.value.status.should.equal(400)
ex.value.message.should.equal(
"An error occurred (DryRunOperation) when calling the ModifyInstanceSecurityGroups operation: Request would have succeeded, but DryRun flag is set"
)
conn.modify_instance_attribute(instance.id, "groupSet", [security_group.id])
security_group_instances = security_group.instances()
assert len(security_group_instances) == 1
assert security_group_instances[0].id == instance.id
@mock_ec2
def test_modify_delete_on_termination():
ec2_client = boto3.resource("ec2", region_name="us-west-1")
result = ec2_client.create_instances(ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1)
instance = result[0]
instance.load()
instance.block_device_mappings[0]["Ebs"]["DeleteOnTermination"].should.be(True)
instance.modify_attribute(
BlockDeviceMappings=[
{"DeviceName": "/dev/sda1", "Ebs": {"DeleteOnTermination": False}}
]
)
instance.load()
instance.block_device_mappings[0]["Ebs"]["DeleteOnTermination"].should.be(False)
@mock_ec2
def test_create_instance_ebs_optimized():
ec2_resource = boto3.resource("ec2", region_name="eu-west-1")
instance = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID, MaxCount=1, MinCount=1, EbsOptimized=True
)[0]
instance.load()
instance.ebs_optimized.should.be(True)
instance.modify_attribute(EbsOptimized={"Value": False})
instance.load()
instance.ebs_optimized.should.be(False)
instance = ec2_resource.create_instances(
ImageId=EXAMPLE_AMI_ID, MaxCount=1, MinCount=1,
)[0]
instance.load()
instance.ebs_optimized.should.be(False)
@mock_ec2
def test_run_multiple_instances_in_same_command():
instance_count = 4
client = boto3.client("ec2", region_name="us-east-1")
client.run_instances(
ImageId=EXAMPLE_AMI_ID, MinCount=instance_count, MaxCount=instance_count
)
reservations = client.describe_instances()["Reservations"]
reservations[0]["Instances"].should.have.length_of(instance_count)
instances = reservations[0]["Instances"]
for i in range(0, instance_count):
instances[i]["AmiLaunchIndex"].should.be(i)
@mock_ec2
def test_describe_instance_attribute():
client = boto3.client("ec2", region_name="us-east-1")
security_group_id = client.create_security_group(
GroupName="test security group", Description="this is a test security group"
)["GroupId"]
client.run_instances(
ImageId=EXAMPLE_AMI_ID,
MinCount=1,
MaxCount=1,
SecurityGroupIds=[security_group_id],
)
instance_id = client.describe_instances()["Reservations"][0]["Instances"][0][
"InstanceId"
]
valid_instance_attributes = [
"instanceType",
"kernel",
"ramdisk",
"userData",
"disableApiTermination",
"instanceInitiatedShutdownBehavior",
"rootDeviceName",
"blockDeviceMapping",
"productCodes",
"sourceDestCheck",
"groupSet",
"ebsOptimized",
"sriovNetSupport",
]
for valid_instance_attribute in valid_instance_attributes:
response = client.describe_instance_attribute(
InstanceId=instance_id, Attribute=valid_instance_attribute
)
if valid_instance_attribute == "groupSet":
response.should.have.key("Groups")
response["Groups"].should.have.length_of(1)
response["Groups"][0]["GroupId"].should.equal(security_group_id)
elif valid_instance_attribute == "userData":
response.should.have.key("UserData")
response["UserData"].should.be.empty
invalid_instance_attributes = [
"abc",
"Kernel",
"RamDisk",
"userdata",
"iNsTaNcEtYpE",
]
for invalid_instance_attribute in invalid_instance_attributes:
with pytest.raises(ClientError) as ex:
client.describe_instance_attribute(
InstanceId=instance_id, Attribute=invalid_instance_attribute
)
ex.value.response["Error"]["Code"].should.equal("InvalidParameterValue")
ex.value.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400)
message = "Value ({invalid_instance_attribute}) for parameter attribute is invalid. Unknown attribute.".format(
invalid_instance_attribute=invalid_instance_attribute
)
ex.value.response["Error"]["Message"].should.equal(message)
@mock_ec2
def test_warn_on_invalid_ami():
if settings.TEST_SERVER_MODE:
raise SkipTest("Can't capture warnings in server mode.")
ec2 = boto3.resource("ec2", "us-east-1")
with pytest.warns(
PendingDeprecationWarning,
match=r"Could not find AMI with image-id:invalid-ami.+",
):
ec2.create_instances(ImageId="invalid-ami", MinCount=1, MaxCount=1)