| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | import pytest | 
					
						
							|  |  |  | from botocore.exceptions import ClientError | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-08-13 09:49:43 +00:00
										 |  |  | from moto.core import DEFAULT_ACCOUNT_ID as ACCOUNT_ID | 
					
						
							| 
									
										
										
										
											2022-10-04 16:28:30 +00:00
										 |  |  | from . import fixture_efs  # noqa # pylint: disable=unused-import | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-10-04 16:28:30 +00:00
										 |  |  | @pytest.fixture(scope="function", name="file_system") | 
					
						
							|  |  |  | def fixture_file_system(efs): | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  |     create_fs_resp = efs.create_file_system(CreationToken="foobarbaz") | 
					
						
							|  |  |  |     create_fs_resp.pop("ResponseMetadata") | 
					
						
							|  |  |  |     yield create_fs_resp | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_describe_access_points__initial(efs): | 
					
						
							|  |  |  |     resp = efs.describe_access_points() | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert resp["AccessPoints"] == [] | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_create_access_point__simple(efs, file_system): | 
					
						
							|  |  |  |     fs_id = file_system["FileSystemId"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id) | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert resp["ClientToken"] == "ct" | 
					
						
							|  |  |  |     assert "AccessPointId" in resp | 
					
						
							|  |  |  |     assert "AccessPointArn" in resp | 
					
						
							|  |  |  |     assert resp["FileSystemId"] == fs_id | 
					
						
							|  |  |  |     assert resp["OwnerId"] == ACCOUNT_ID | 
					
						
							|  |  |  |     assert resp["LifeCycleState"] == "available" | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert resp["RootDirectory"] == {"Path": "/"} | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_create_access_point__full(efs, file_system): | 
					
						
							|  |  |  |     fs_id = file_system["FileSystemId"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = efs.create_access_point( | 
					
						
							|  |  |  |         ClientToken="ct", | 
					
						
							|  |  |  |         Tags=[{"Key": "key", "Value": "value"}, {"Key": "Name", "Value": "myname"}], | 
					
						
							|  |  |  |         FileSystemId=fs_id, | 
					
						
							|  |  |  |         PosixUser={"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]}, | 
					
						
							|  |  |  |         RootDirectory={ | 
					
						
							|  |  |  |             "Path": "/root/path", | 
					
						
							|  |  |  |             "CreationInfo": { | 
					
						
							|  |  |  |                 "OwnerUid": 987, | 
					
						
							|  |  |  |                 "OwnerGid": 986, | 
					
						
							|  |  |  |                 "Permissions": "root_permissions", | 
					
						
							|  |  |  |             }, | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert resp["ClientToken"] == "ct" | 
					
						
							|  |  |  |     assert resp["Name"] == "myname" | 
					
						
							|  |  |  |     assert resp["Tags"] == [ | 
					
						
							|  |  |  |         {"Key": "key", "Value": "value"}, | 
					
						
							|  |  |  |         {"Key": "Name", "Value": "myname"}, | 
					
						
							|  |  |  |     ] | 
					
						
							|  |  |  |     assert "AccessPointId" in resp | 
					
						
							|  |  |  |     assert "AccessPointArn" in resp | 
					
						
							|  |  |  |     assert resp["FileSystemId"] == fs_id | 
					
						
							|  |  |  |     assert resp["PosixUser"] == {"Uid": 123, "Gid": 123, "SecondaryGids": [124, 125]} | 
					
						
							|  |  |  |     assert resp["RootDirectory"] == { | 
					
						
							|  |  |  |         "Path": "/root/path", | 
					
						
							|  |  |  |         "CreationInfo": { | 
					
						
							|  |  |  |             "OwnerUid": 987, | 
					
						
							|  |  |  |             "OwnerGid": 986, | 
					
						
							|  |  |  |             "Permissions": "root_permissions", | 
					
						
							|  |  |  |         }, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |     assert resp["OwnerId"] == ACCOUNT_ID | 
					
						
							|  |  |  |     assert resp["LifeCycleState"] == "available" | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_describe_access_point(efs, file_system): | 
					
						
							|  |  |  |     fs_id = file_system["FileSystemId"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     access_point_id = efs.create_access_point(ClientToken="ct", FileSystemId=fs_id)[ | 
					
						
							|  |  |  |         "AccessPointId" | 
					
						
							|  |  |  |     ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = efs.describe_access_points(AccessPointId=access_point_id) | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert len(resp["AccessPoints"]) == 1 | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  |     access_point = resp["AccessPoints"][0] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert access_point["ClientToken"] == "ct" | 
					
						
							|  |  |  |     assert "AccessPointId" in access_point | 
					
						
							|  |  |  |     assert "AccessPointArn" in access_point | 
					
						
							|  |  |  |     assert access_point["FileSystemId"] == fs_id | 
					
						
							|  |  |  |     assert access_point["OwnerId"] == ACCOUNT_ID | 
					
						
							|  |  |  |     assert access_point["LifeCycleState"] == "available" | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_describe_access_points__multiple(efs, file_system): | 
					
						
							|  |  |  |     fs_id = file_system["FileSystemId"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id) | 
					
						
							|  |  |  |     efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     resp = efs.describe_access_points() | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert len(resp["AccessPoints"]) == 2 | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def test_delete_access_points(efs, file_system): | 
					
						
							|  |  |  |     fs_id = file_system["FileSystemId"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     ap_id1 = efs.create_access_point(ClientToken="ct1", FileSystemId=fs_id)[ | 
					
						
							|  |  |  |         "AccessPointId" | 
					
						
							|  |  |  |     ] | 
					
						
							|  |  |  |     ap_id2 = efs.create_access_point(ClientToken="ct2", FileSystemId=fs_id)[ | 
					
						
							|  |  |  |         "AccessPointId" | 
					
						
							|  |  |  |     ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Delete one access point | 
					
						
							|  |  |  |     efs.delete_access_point(AccessPointId=ap_id2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # We can only find one | 
					
						
							|  |  |  |     resp = efs.describe_access_points() | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert len(resp["AccessPoints"]) == 1 | 
					
						
							| 
									
										
										
										
											2022-03-15 18:51:03 -01:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # The first one still exists | 
					
						
							|  |  |  |     efs.describe_access_points(AccessPointId=ap_id1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # The second one is gone | 
					
						
							|  |  |  |     with pytest.raises(ClientError) as exc_info: | 
					
						
							|  |  |  |         efs.describe_access_points(AccessPointId=ap_id2) | 
					
						
							|  |  |  |     err = exc_info.value.response["Error"] | 
					
						
							| 
									
										
										
										
											2023-07-05 22:47:50 +00:00
										 |  |  |     assert err["Code"] == "AccessPointNotFound" |