moto/tests/test_efs/test_access_points.py

128 lines
4.0 KiB
Python

import pytest
from botocore.exceptions import ClientError
from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID
from . import fixture_efs # noqa # pylint: disable=unused-import
@pytest.fixture(scope="function", name="file_system")
def fixture_file_system(efs):
create_fs_resp = efs.create_file_system(CreationToken="foobarbaz")
create_fs_resp.pop("ResponseMetadata")
yield create_fs_resp
def test_describe_access_points__initial(efs):
resp = efs.describe_access_points()
assert resp["AccessPoints"] == []
def test_create_access_point__simple(efs, file_system):
fs_id = file_system["FileSystemId"]
resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)
assert resp["ClientToken"] == "ct"
assert "AccessPointId" in resp
assert "AccessPointArn" in resp
assert resp["FileSystemId"] == fs_id
assert resp["OwnerId"] == ACCOUNT_ID
assert resp["LifeCycleState"] == "available"
assert resp["RootDirectory"] == {"Path": "/"}
def test_create_access_point__full(efs, file_system):
fs_id = file_system["FileSystemId"]
resp = efs.create_access_point(
ClientToken="ct",
Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}],
FileSystemId=fs_id,
PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]},
RootDirectory={
"Path": "/root/path",
"CreationInfo": {
"OwnerUid": 987,
"OwnerGid": 986,
"Permissions": "root_permissions",
},
},
)
assert resp["ClientToken"] == "ct"
assert resp["Name"] == "myname"
assert resp["Tags"] == [
{"Key": "key", "Value": "value"},
{"Key": "Name", "Value": "myname"},
]
assert "AccessPointId" in resp
assert "AccessPointArn" in resp
assert resp["FileSystemId"] == fs_id
assert resp["PosixUser"] == {"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}
assert resp["RootDirectory"] == {
"Path": "/root/path",
"CreationInfo": {
"OwnerUid": 987,
"OwnerGid": 986,
"Permissions": "root_permissions",
},
}
assert resp["OwnerId"] == ACCOUNT_ID
assert resp["LifeCycleState"] == "available"
def test_describe_access_point(efs, file_system):
fs_id = file_system["FileSystemId"]
access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[
"AccessPointId"
]
resp = efs.describe_access_points(AccessPointId=access_point_id)
assert len(resp["AccessPoints"]) == 1
access_point = resp["AccessPoints"][0]
assert access_point["ClientToken"] == "ct"
assert "AccessPointId" in access_point
assert "AccessPointArn" in access_point
assert access_point["FileSystemId"] == fs_id
assert access_point["OwnerId"] == ACCOUNT_ID
assert access_point["LifeCycleState"] == "available"
def test_describe_access_points__multiple(efs, file_system):
fs_id = file_system["FileSystemId"]
efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)
efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)
resp = efs.describe_access_points()
assert len(resp["AccessPoints"]) == 2
def test_delete_access_points(efs, file_system):
fs_id = file_system["FileSystemId"]
ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[
"AccessPointId"
]
ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[
"AccessPointId"
]
# Delete one access point
efs.delete_access_point(AccessPointId=ap_id2)
# We can only find one
resp = efs.describe_access_points()
assert len(resp["AccessPoints"]) == 1
# The first one still exists
efs.describe_access_points(AccessPointId=ap_id1)
# The second one is gone
with pytest.raises(ClientError) as exc_info:
efs.describe_access_points(AccessPointId=ap_id2)
err = exc_info.value.response["Error"]
assert err["Code"] == "AccessPointNotFound"