152 lines
4.9 KiB
Python
152 lines
4.9 KiB
Python
import pytest
|
|
from botocore.exceptions import ClientError
|
|
|
|
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
|
|
from . import fixture_efs # noqa # pylint: disable=unused-import
|
|
|
|
|
|
@pytest.fixture(scope="function", name="file_system")
|
|
def fixture_file_system(efs):
|
|
create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
|
|
create_fs_resp.pop("ResponseMetadata")
|
|
yield create_fs_resp
|
|
|
|
|
|
def test_describe_access_points__initial(efs):
|
|
resp = efs.describe_access_points()
|
|
assert resp["AccessPoints"] == []
|
|
|
|
|
|
def test_create_access_point__simple(efs, file_system):
|
|
fs_id = file_system["FileSystemId"]
|
|
|
|
resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)
|
|
assert resp["ClientToken"] == "ct"
|
|
assert "AccessPointId" in resp
|
|
assert "AccessPointArn" in resp
|
|
assert resp["FileSystemId"] == fs_id
|
|
assert resp["OwnerId"] == ACCOUNT_ID
|
|
assert resp["LifeCycleState"] == "available"
|
|
|
|
assert resp["RootDirectory"] == {"Path": "/"}
|
|
|
|
|
|
def test_create_access_point__full(efs, file_system):
|
|
fs_id = file_system["FileSystemId"]
|
|
|
|
resp = efs.create_access_point(
|
|
ClientToken="ct",
|
|
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
|
|
FileSystemId=fs_id,
|
|
PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]},
|
|
RootDirectory={
|
|
"Path": "/root/path",
|
|
"CreationInfo": {
|
|
"OwnerUid": 987,
|
|
"OwnerGid": 986,
|
|
"Permissions": "root_permissions",
|
|
},
|
|
},
|
|
)
|
|
|
|
assert resp["ClientToken"] == "ct"
|
|
assert resp["Name"] == "myname"
|
|
assert resp["Tags"] == [
|
|
{"Key": "key", "Value": "value"},
|
|
{"Key": "Name", "Value": "myname"},
|
|
]
|
|
assert "AccessPointId" in resp
|
|
assert "AccessPointArn" in resp
|
|
assert resp["FileSystemId"] == fs_id
|
|
assert resp["PosixUser"] == {"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}
|
|
assert resp["RootDirectory"] == {
|
|
"Path": "/root/path",
|
|
"CreationInfo": {
|
|
"OwnerUid": 987,
|
|
"OwnerGid": 986,
|
|
"Permissions": "root_permissions",
|
|
},
|
|
}
|
|
assert resp["OwnerId"] == ACCOUNT_ID
|
|
assert resp["LifeCycleState"] == "available"
|
|
|
|
|
|
def test_describe_access_point(efs, file_system):
|
|
fs_id = file_system["FileSystemId"]
|
|
|
|
access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[
|
|
"AccessPointId"
|
|
]
|
|
|
|
resp = efs.describe_access_points(AccessPointId=access_point_id)
|
|
assert len(resp["AccessPoints"]) == 1
|
|
access_point = resp["AccessPoints"][0]
|
|
|
|
assert access_point["ClientToken"] == "ct"
|
|
assert "AccessPointId" in access_point
|
|
assert "AccessPointArn" in access_point
|
|
assert access_point["FileSystemId"] == fs_id
|
|
assert access_point["OwnerId"] == ACCOUNT_ID
|
|
assert access_point["LifeCycleState"] == "available"
|
|
|
|
|
|
def test_describe_access_points__multiple(efs, file_system):
|
|
fs_id = file_system["FileSystemId"]
|
|
|
|
efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)
|
|
efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)
|
|
|
|
resp = efs.describe_access_points()
|
|
assert len(resp["AccessPoints"]) == 2
|
|
|
|
|
|
def test_describe_access_points__filters(efs):
|
|
fs_id1 = efs.create_file_system(CreationToken="foobarbaz")["FileSystemId"]
|
|
fs_id2 = efs.create_file_system(CreationToken="bazbarfoo")["FileSystemId"]
|
|
|
|
ap_id1 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id1)[
|
|
"AccessPointId"
|
|
]
|
|
ap_id2 = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id2)[
|
|
"AccessPointId"
|
|
]
|
|
|
|
resp = efs.describe_access_points(FileSystemId=fs_id1)
|
|
assert len(resp["AccessPoints"]) == 1
|
|
assert resp.get("NextToken") is None
|
|
assert resp["AccessPoints"][0]["FileSystemId"] == fs_id1
|
|
assert resp["AccessPoints"][0]["AccessPointId"] == ap_id1
|
|
|
|
resp = efs.describe_access_points(AccessPointId=ap_id2)
|
|
assert len(resp["AccessPoints"]) == 1
|
|
assert resp.get("NextToken") is None
|
|
assert resp["AccessPoints"][0]["FileSystemId"] == fs_id2
|
|
assert resp["AccessPoints"][0]["AccessPointId"] == ap_id2
|
|
|
|
|
|
def test_delete_access_points(efs, file_system):
|
|
fs_id = file_system["FileSystemId"]
|
|
|
|
ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[
|
|
"AccessPointId"
|
|
]
|
|
ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[
|
|
"AccessPointId"
|
|
]
|
|
|
|
# Delete one access point
|
|
efs.delete_access_point(AccessPointId=ap_id2)
|
|
|
|
# We can only find one
|
|
resp = efs.describe_access_points()
|
|
assert len(resp["AccessPoints"]) == 1
|
|
|
|
# The first one still exists
|
|
efs.describe_access_points(AccessPointId=ap_id1)
|
|
|
|
# The second one is gone
|
|
with pytest.raises(ClientError) as exc_info:
|
|
efs.describe_access_points(AccessPointId=ap_id2)
|
|
err = exc_info.value.response["Error"]
|
|
assert err["Code"] == "AccessPointNotFound"
|