import re from os import environ import pytest from moto import mock_efs, mock_ec2 import moto.server as server FILE_SYSTEMS = "/2015-02-01/file-systems" MOUNT_TARGETS = "/2015-02-01/mount-targets" @pytest.fixture(scope="function") def aws_credentials(): """Mocked AWS Credentials for moto.""" environ["AWS_ACCESS_KEY_ID"] = "testing" environ["AWS_SESSION_TOKEN"] = "testing" environ["AWS_SECRET_ACCESS_KEY"] = "testing" environ["AWS_SECURITY_TOKEN"] = "testing" @pytest.fixture(scope="function") def efs_client(aws_credentials): # pylint: disable=unused-argument with mock_efs(): yield server.create_backend_app("efs").test_client() @pytest.fixture(scope="function") def subnet_id(aws_credentials): # pylint: disable=unused-argument with mock_ec2(): ec2_client = server.create_backend_app("ec2").test_client() resp = ec2_client.get("/?Action=DescribeSubnets") subnet_ids = re.findall("(.*?)", resp.data.decode("utf-8")) yield subnet_ids[0] @pytest.fixture(scope="function") def file_system_id(efs_client): resp = efs_client.post( FILE_SYSTEMS, json={"CreationToken": "foobarbaz", "Backup": True} ) yield resp.json["FileSystemId"] """ Test each method current supported to ensure it connects via the server. NOTE: this does NOT test whether the endpoints are really functioning, just that they connect and respond. Tests of functionality are contained in `test_file_system` and `test_mount_target`. """ def test_efs_file_system_create(efs_client): res = efs_client.post(FILE_SYSTEMS, json={"CreationToken": "2398asdfkajsdf"}) assert res.status_code == 201 def test_efs_file_system_describe(efs_client): res = efs_client.get(FILE_SYSTEMS) assert res.status_code == 200 def test_efs_file_system_delete(file_system_id, efs_client): res = efs_client.delete("/2015-02-01/file-systems/{}".format(file_system_id)) assert res.status_code == 204 def test_efs_mount_target_create(file_system_id, subnet_id, efs_client): res = efs_client.post( "/2015-02-01/mount-targets", json={"FileSystemId": file_system_id, "SubnetId": subnet_id}, ) assert res.status_code == 200 def test_efs_mount_target_describe(file_system_id, efs_client): res = efs_client.get(f"/2015-02-01/mount-targets?FileSystemId={file_system_id}") assert res.status_code == 200 def test_efs_mount_target_delete(file_system_id, subnet_id, efs_client): create_res = efs_client.post( "/2015-02-01/mount-targets", json={"FileSystemId": file_system_id, "SubnetId": subnet_id}, ) mt_id = create_res.json["MountTargetId"] res = efs_client.delete("/2015-02-01/mount-targets/{}".format(mt_id)) assert res.status_code == 204 def test_efs_describe_backup_policy(file_system_id, efs_client): res = efs_client.get( "/2015-02-01/file-systems/{}/backup-policy".format(file_system_id) ) assert res.status_code == 200