import re from ipaddress import IPv4Network import pytest from botocore.exceptions import ClientError from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID from tests.test_efs.junk_drawer import has_status_code from . import fixture_ec2, fixture_efs # noqa # pylint: disable=unused-import @pytest.fixture(scope="function", name="file_system") def fixture_file_system(efs): create_fs_resp = efs.create_file_system(CreationToken="foobarbaz") create_fs_resp.pop("ResponseMetadata") yield create_fs_resp @pytest.fixture(scope="function", name="subnet") def fixture_subnet(ec2): desc_sn_resp = ec2.describe_subnets() subnet = desc_sn_resp["Subnets"][0] yield subnet def test_create_mount_target_minimal_correct_use(efs, file_system, subnet): subnet_id = subnet["SubnetId"] file_system_id = file_system["FileSystemId"] # Create the mount target. create_mt_resp = efs.create_mount_target( FileSystemId=file_system_id, SubnetId=subnet_id ) # Check the mount target response code. resp_metadata = create_mt_resp.pop("ResponseMetadata") assert resp_metadata["HTTPStatusCode"] == 200 # Check the mount target response body. assert re.match("^fsmt-[a-f0-9]+$", create_mt_resp["MountTargetId"]) assert re.match("^eni-[a-f0-9]+$", create_mt_resp["NetworkInterfaceId"]) assert create_mt_resp["AvailabilityZoneId"] == subnet["AvailabilityZoneId"] assert create_mt_resp["AvailabilityZoneName"] == subnet["AvailabilityZone"] assert create_mt_resp["VpcId"] == subnet["VpcId"] assert create_mt_resp["SubnetId"] == subnet_id assert IPv4Network(create_mt_resp["IpAddress"]).subnet_of( IPv4Network(subnet["CidrBlock"]) ) assert create_mt_resp["FileSystemId"] == file_system_id assert create_mt_resp["OwnerId"] == ACCOUNT_ID assert create_mt_resp["LifeCycleState"] == "available" # Check that the number of mount targets in the fs is correct. desc_fs_resp = efs.describe_file_systems() file_system = desc_fs_resp["FileSystems"][0] assert file_system["NumberOfMountTargets"] == 1 return def test_create_mount_target_aws_sample_2(efs, ec2, file_system, subnet): subnet_id = subnet["SubnetId"] file_system_id = file_system["FileSystemId"] subnet_network = IPv4Network(subnet["CidrBlock"]) for ip_addr_obj in subnet_network.hosts(): ip_addr = ip_addr_obj.exploded break else: assert ( False ), f"Could not generate an IP address from CIDR block: {subnet['CidrBlock']}" desc_sg_resp = ec2.describe_security_groups() security_group = desc_sg_resp["SecurityGroups"][0] security_group_id = security_group["GroupId"] # Make sure nothing chokes. sample_input = { "FileSystemId": file_system_id, "SubnetId": subnet_id, "IpAddress": ip_addr, "SecurityGroups": [security_group_id], } create_mt_resp = efs.create_mount_target(**sample_input) # Check the mount target response code. resp_metadata = create_mt_resp.pop("ResponseMetadata") assert resp_metadata["HTTPStatusCode"] == 200 # Check that setting the IP Address worked. assert create_mt_resp["IpAddress"] == ip_addr def test_create_mount_target_invalid_file_system_id(efs, subnet): with pytest.raises(ClientError) as exc_info: efs.create_mount_target(FileSystemId="fs-12343289", SubnetId=subnet["SubnetId"]) resp = exc_info.value.response assert has_status_code(resp, 404) assert "FileSystemNotFound" == resp["Error"]["Code"] def test_create_mount_target_invalid_subnet_id(efs, file_system): with pytest.raises(ClientError) as exc_info: efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId="subnet-12345678" ) resp = exc_info.value.response assert has_status_code(resp, 404) assert "SubnetNotFound" == resp["Error"]["Code"] def test_create_mount_target_invalid_sg_id(efs, file_system, subnet): with pytest.raises(ClientError) as exc_info: efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"], SecurityGroups=["sg-1234df235"], ) resp = exc_info.value.response assert has_status_code(resp, 404) assert "SecurityGroupNotFound" == resp["Error"]["Code"] def test_create_second_mount_target_wrong_vpc(efs, ec2, file_system, subnet): vpc_info = ec2.create_vpc(CidrBlock="10.1.0.0/16") new_subnet_info = ec2.create_subnet( VpcId=vpc_info["Vpc"]["VpcId"], CidrBlock="10.1.1.0/24" ) efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) with pytest.raises(ClientError) as exc_info: efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=new_subnet_info["Subnet"]["SubnetId"], ) resp = exc_info.value.response assert has_status_code(resp, 409) assert "MountTargetConflict" == resp["Error"]["Code"] assert "VPC" in resp["Error"]["Message"] def test_create_mount_target_duplicate_subnet_id(efs, file_system, subnet): efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) with pytest.raises(ClientError) as exc_info: efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) resp = exc_info.value.response assert has_status_code(resp, 409) assert "MountTargetConflict" == resp["Error"]["Code"] assert "AZ" in resp["Error"]["Message"] def test_create_mount_target_subnets_in_same_zone(efs, ec2, file_system, subnet): efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) subnet_info = ec2.create_subnet( VpcId=subnet["VpcId"], CidrBlock="172.31.96.0/20", AvailabilityZone=subnet["AvailabilityZone"], ) with pytest.raises(ClientError) as exc_info: efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet_info["Subnet"]["SubnetId"], ) resp = exc_info.value.response assert has_status_code(resp, 409) assert "MountTargetConflict" == resp["Error"]["Code"] assert "AZ" in resp["Error"]["Message"] def test_create_mount_target_ip_address_out_of_range(efs, file_system, subnet): with pytest.raises(ClientError) as exc_info: efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"], IpAddress="10.0.1.0", ) resp = exc_info.value.response assert has_status_code(resp, 400) assert "BadRequest" == resp["Error"]["Code"] assert "Address" in resp["Error"]["Message"] def test_create_mount_target_too_many_security_groups(efs, ec2, file_system, subnet): sg_id_list = [] for i in range(6): sg_info = ec2.create_security_group( VpcId=subnet["VpcId"], GroupName=f"sg-{i}", Description=f"SG-{i} protects us from the Goa'uld.", ) sg_id_list.append(sg_info["GroupId"]) with pytest.raises(ClientError) as exc_info: efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"], SecurityGroups=sg_id_list, ) resp = exc_info.value.response assert has_status_code(resp, 400) assert "SecurityGroupLimitExceeded" == resp["Error"]["Code"] def test_delete_file_system_mount_targets_attached( efs, ec2, file_system, subnet ): # pylint: disable=unused-argument efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) with pytest.raises(ClientError) as exc_info: efs.delete_file_system(FileSystemId=file_system["FileSystemId"]) resp = exc_info.value.response assert has_status_code(resp, 409) assert "FileSystemInUse" == resp["Error"]["Code"] def test_describe_mount_targets_minimal_case( efs, ec2, file_system, subnet ): # pylint: disable=unused-argument create_resp = efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) create_resp.pop("ResponseMetadata") # Describe the mount targets desc_mt_resp = efs.describe_mount_targets(FileSystemId=file_system["FileSystemId"]) desc_mt_resp_metadata = desc_mt_resp.pop("ResponseMetadata") assert desc_mt_resp_metadata["HTTPStatusCode"] == 200 # Check the list results. mt_list = desc_mt_resp["MountTargets"] assert len(mt_list) == 1 mount_target = mt_list[0] assert mount_target["MountTargetId"] == create_resp["MountTargetId"] # Pop out the timestamps and see if the rest of the description is the same. assert mount_target == create_resp def test_describe_mount_targets__by_access_point_id( efs, ec2, file_system, subnet ): # pylint: disable=unused-argument create_resp = efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) create_resp.pop("ResponseMetadata") ap_resp = efs.create_access_point( ClientToken="ct1", FileSystemId=file_system["FileSystemId"] ) access_point_id = ap_resp["AccessPointId"] # Describe the mount targets ap_resp = efs.describe_mount_targets(AccessPointId=access_point_id) # Check the list results. assert len(ap_resp["MountTargets"]) == 1 assert ap_resp["MountTargets"][0]["MountTargetId"] == create_resp["MountTargetId"] def test_describe_mount_targets_paging(efs, ec2, file_system): fs_id = file_system["FileSystemId"] # Get a list of subnets. subnet_list = ec2.describe_subnets()["Subnets"] # Create several mount targets. for subnet in subnet_list: efs.create_mount_target(FileSystemId=fs_id, SubnetId=subnet["SubnetId"]) # First call (Start) # ------------------ # Call the tested function resp1 = efs.describe_mount_targets(FileSystemId=fs_id, MaxItems=2) # Check the response status assert has_status_code(resp1, 200) # Check content of the result. resp1.pop("ResponseMetadata") assert set(resp1.keys()) == {"NextMarker", "MountTargets"} assert len(resp1["MountTargets"]) == 2 mt_id_set_1 = {mt["MountTargetId"] for mt in resp1["MountTargets"]} # Second call (Middle) # -------------------- # Get the next marker. resp2 = efs.describe_mount_targets( FileSystemId=fs_id, MaxItems=2, Marker=resp1["NextMarker"] ) # Check the response status resp2_metadata = resp2.pop("ResponseMetadata") assert resp2_metadata["HTTPStatusCode"] == 200 # Check the response contents. assert set(resp2.keys()) == {"NextMarker", "MountTargets", "Marker"} assert len(resp2["MountTargets"]) == 2 assert resp2["Marker"] == resp1["NextMarker"] mt_id_set_2 = {mt["MountTargetId"] for mt in resp2["MountTargets"]} assert mt_id_set_1 & mt_id_set_2 == set() # Third call (End) # ---------------- # Get the last marker results resp3 = efs.describe_mount_targets( FileSystemId=fs_id, MaxItems=20, Marker=resp2["NextMarker"] ) # Check the response status resp3_metadata = resp3.pop("ResponseMetadata") assert resp3_metadata["HTTPStatusCode"] == 200 # Check the response contents. assert set(resp3.keys()) == {"MountTargets", "Marker"} assert resp3["Marker"] == resp2["NextMarker"] mt_id_set_3 = {mt["MountTargetId"] for mt in resp3["MountTargets"]} assert mt_id_set_3 & (mt_id_set_1 | mt_id_set_2) == set() def test_describe_mount_targets_invalid_file_system_id(efs): with pytest.raises(ClientError) as exc_info: efs.describe_mount_targets(FileSystemId="fs-12343289") resp = exc_info.value.response assert has_status_code(resp, 404) assert "FileSystemNotFound" == resp["Error"]["Code"] def test_describe_mount_targets_invalid_mount_target_id(efs): with pytest.raises(ClientError) as exc_info: efs.describe_mount_targets(MountTargetId="fsmt-ad9f8987") resp = exc_info.value.response assert has_status_code(resp, 404) assert "MountTargetNotFound" == resp["Error"]["Code"] def test_describe_mount_targets_no_id_given(efs): with pytest.raises(ClientError) as exc_info: efs.describe_mount_targets() resp = exc_info.value.response assert has_status_code(resp, 400) assert "BadRequest" == resp["Error"]["Code"] def test_delete_mount_target_minimal_case(efs, file_system, subnet): mt_info = efs.create_mount_target( FileSystemId=file_system["FileSystemId"], SubnetId=subnet["SubnetId"] ) resp = efs.delete_mount_target(MountTargetId=mt_info["MountTargetId"]) assert has_status_code(resp, 204) desc_resp = efs.describe_mount_targets(FileSystemId=file_system["FileSystemId"]) assert len(desc_resp["MountTargets"]) == 0 def test_delete_mount_target_invalid_mount_target_id(efs): with pytest.raises(ClientError) as exc_info: efs.delete_mount_target(MountTargetId="fsmt-98487aef0a7") resp = exc_info.value.response assert has_status_code(resp, 404) assert "MountTargetNotFound" == resp["Error"]["Code"]