153 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			153 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import pytest
 | |
| from botocore.exceptions import ClientError
 | |
| 
 | |
| from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
 | |
| 
 | |
| from . import fixture_efs  # noqa # pylint: disable=unused-import
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="function", name="file_system")
 | |
| def fixture_file_system(efs):
 | |
|     create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
 | |
|     create_fs_resp.pop("ResponseMetadata")
 | |
|     yield create_fs_resp
 | |
| 
 | |
| 
 | |
| def test_describe_access_points__initial(efs):
 | |
|     resp = efs.describe_access_points()
 | |
|     assert resp["AccessPoints"] == []
 | |
| 
 | |
| 
 | |
| def test_create_access_point__simple(efs, file_system):
 | |
|     fs_id = file_system["FileSystemId"]
 | |
| 
 | |
|     resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)
 | |
|     assert resp["ClientToken"] == "ct"
 | |
|     assert "AccessPointId" in resp
 | |
|     assert "AccessPointArn" in resp
 | |
|     assert resp["FileSystemId"] == fs_id
 | |
|     assert resp["OwnerId"] == ACCOUNT_ID
 | |
|     assert resp["LifeCycleState"] == "available"
 | |
| 
 | |
|     assert resp["RootDirectory"] == {"Path": "/"}
 | |
| 
 | |
| 
 | |
| def test_create_access_point__full(efs, file_system):
 | |
|     fs_id = file_system["FileSystemId"]
 | |
| 
 | |
|     resp = efs.create_access_point(
 | |
|         ClientToken="ct",
 | |
|         Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
 | |
|         FileSystemId=fs_id,
 | |
|         PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]},
 | |
|         RootDirectory={
 | |
|             "Path": "/root/path",
 | |
|             "CreationInfo": {
 | |
|                 "OwnerUid": 987,
 | |
|                 "OwnerGid": 986,
 | |
|                 "Permissions": "root_permissions",
 | |
|             },
 | |
|         },
 | |
|     )
 | |
| 
 | |
|     assert resp["ClientToken"] == "ct"
 | |
|     assert resp["Name"] == "myname"
 | |
|     assert resp["Tags"] == [
 | |
|         {"Key": "key", "Value": "value"},
 | |
|         {"Key": "Name", "Value": "myname"},
 | |
|     ]
 | |
|     assert "AccessPointId" in resp
 | |
|     assert "AccessPointArn" in resp
 | |
|     assert resp["FileSystemId"] == fs_id
 | |
|     assert resp["PosixUser"] == {"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}
 | |
|     assert resp["RootDirectory"] == {
 | |
|         "Path": "/root/path",
 | |
|         "CreationInfo": {
 | |
|             "OwnerUid": 987,
 | |
|             "OwnerGid": 986,
 | |
|             "Permissions": "root_permissions",
 | |
|         },
 | |
|     }
 | |
|     assert resp["OwnerId"] == ACCOUNT_ID
 | |
|     assert resp["LifeCycleState"] == "available"
 | |
| 
 | |
| 
 | |
| def test_describe_access_point(efs, file_system):
 | |
|     fs_id = file_system["FileSystemId"]
 | |
| 
 | |
|     access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[
 | |
|         "AccessPointId"
 | |
|     ]
 | |
| 
 | |
|     resp = efs.describe_access_points(AccessPointId=access_point_id)
 | |
|     assert len(resp["AccessPoints"]) == 1
 | |
|     access_point = resp["AccessPoints"][0]
 | |
| 
 | |
|     assert access_point["ClientToken"] == "ct"
 | |
|     assert "AccessPointId" in access_point
 | |
|     assert "AccessPointArn" in access_point
 | |
|     assert access_point["FileSystemId"] == fs_id
 | |
|     assert access_point["OwnerId"] == ACCOUNT_ID
 | |
|     assert access_point["LifeCycleState"] == "available"
 | |
| 
 | |
| 
 | |
| def test_describe_access_points__multiple(efs, file_system):
 | |
|     fs_id = file_system["FileSystemId"]
 | |
| 
 | |
|     efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)
 | |
|     efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)
 | |
| 
 | |
|     resp = efs.describe_access_points()
 | |
|     assert len(resp["AccessPoints"]) == 2
 | |
| 
 | |
| 
 | |
| def test_describe_access_points__filters(efs):
 | |
|     fs_id1 = efs.create_file_system(CreationToken="foobarbaz")["FileSystemId"]
 | |
|     fs_id2 = efs.create_file_system(CreationToken="bazbarfoo")["FileSystemId"]
 | |
| 
 | |
|     ap_id1 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id1)[
 | |
|         "AccessPointId"
 | |
|     ]
 | |
|     ap_id2 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id2)[
 | |
|         "AccessPointId"
 | |
|     ]
 | |
| 
 | |
|     resp = efs.describe_access_points(FileSystemId=fs_id1)
 | |
|     assert len(resp["AccessPoints"]) == 1
 | |
|     assert resp.get("NextToken") is None
 | |
|     assert resp["AccessPoints"][0]["FileSystemId"] == fs_id1
 | |
|     assert resp["AccessPoints"][0]["AccessPointId"] == ap_id1
 | |
| 
 | |
|     resp = efs.describe_access_points(AccessPointId=ap_id2)
 | |
|     assert len(resp["AccessPoints"]) == 1
 | |
|     assert resp.get("NextToken") is None
 | |
|     assert resp["AccessPoints"][0]["FileSystemId"] == fs_id2
 | |
|     assert resp["AccessPoints"][0]["AccessPointId"] == ap_id2
 | |
| 
 | |
| 
 | |
| def test_delete_access_points(efs, file_system):
 | |
|     fs_id = file_system["FileSystemId"]
 | |
| 
 | |
|     ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[
 | |
|         "AccessPointId"
 | |
|     ]
 | |
|     ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[
 | |
|         "AccessPointId"
 | |
|     ]
 | |
| 
 | |
|     # Delete one access point
 | |
|     efs.delete_access_point(AccessPointId=ap_id2)
 | |
| 
 | |
|     # We can only find one
 | |
|     resp = efs.describe_access_points()
 | |
|     assert len(resp["AccessPoints"]) == 1
 | |
| 
 | |
|     # The first one still exists
 | |
|     efs.describe_access_points(AccessPointId=ap_id1)
 | |
| 
 | |
|     # The second one is gone
 | |
|     with pytest.raises(ClientError) as exc_info:
 | |
|         efs.describe_access_points(AccessPointId=ap_id2)
 | |
|     err = exc_info.value.response["Error"]
 | |
|     assert err["Code"] == "AccessPointNotFound"
 |